Java中golang通道的等价物

我有一个要求,我需要从一组阻塞队列中读取。 阻塞队列由我正在使用的库创建。 我的代码必须从队列中读取。 我不想为每个阻塞队列创建一个读者线程。 相反,我想使用单个线程(或者可能最多使用2/3线程)轮询它们的数据可用性。 由于某些阻塞队列可能长时间没有数据,而其中一些阻塞队列可能会获得数据突发。 轮询具有较小超时的队列将起作用,但这根本不是有效的,因为它仍然需要在所有队列上保持循环,即使其中一些队列长时间没有数据。 基本上,我正在寻找阻塞队列的select / epoll(用于套接字)类型的机制。 任何线索都非常感谢。

尽管如此,在Go中这样做很容易。 下面的代码模拟了与channel和goroutines相同的内容:

package main import "fmt" import "time" import "math/rand" func sendMessage(sc chan string) { var i int for { i = rand.Intn(10) for ; i >= 0 ; i-- { sc = 0; i-- { time.Sleep(20 * time.Millisecond) c <- rand.Intn(65534) } i = 1000 + rand.Intn(24000); time.Sleep(time.Duration(i) * time.Millisecond) } } func main() { msgchan := make(chan string, 32) numchan := make(chan int, 32) i := 0 for ; i < 8 ; i++ { go sendNum(numchan) go sendMessage(msgchan) } for { select { case msg := <- msgchan: fmt.Printf("Worked on %s\n", msg) case x := <- numchan: fmt.Printf("I got %d \n", x) } } } 

我建议你考虑使用JCSP库。 相当于Go的select被称为Alternative 。 您只需要一个消耗线程,如果使用Alternative打开它们,则不需要轮询传入通道。 因此,这将是多路复用源数据的有效方式。

如果您能够使用JCSP频道替换BlockingQueues,它将会有很大帮助。 通道的行为基本相同,但在扇出或扇入共享通道末端方面提供了更大程度的灵活性,特别是在使用Alternative使用通道。

对于使用示例,这里是公平的多路复用器。 此示例演示了一个将来自其输入通道arrays的流量与其单个输出通道进行相当多路复用的过程。 无论竞争对手的热情如何,都不会缺乏输入渠道。

 import org.jcsp.lang.*; public class FairPlex implements CSProcess { private final AltingChannelInput[] in; private final ChannelOutput out; public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) { this.in = in; this.out = out; } public void run () { final Alternative alt = new Alternative (in); while (true) { final int index = alt.fairSelect (); out.write (in[index].read ()); } } } 

请注意,如果priSelect使用priSelect ,如果索引较低的频道不断要求服务,则较高索引的频道将会缺乏。 或者代替fairSelect ,可以使用select ,但是不能进行饥饿分析。 只有在饥饿不成问题时才应使用select机制。

摆脱僵局

与Go一样,使用通道的Java程序必须设计为不会死锁。 在Java中实现低级并发原语非常难以实现 ,并且您需要一些可靠的东西。 幸运的是, Alternative已经通过正式分析以及JCSP渠道得到validation。 这使它成为一个可靠的可靠选择。

为了澄清一点点困惑,目前的JCSP版本在Maven回购中是1.1-rc5 ,而不是网站所说的。

唯一的方法是使用function更强的类的对象替换标准队列,这会在将数据插入空队列时通知消费者。 这个类仍然可以实现BlockingQueue接口,所以另一方(生产者)看不出区别。 诀窍是put操作还应该引发一个标志并通知消费者。 消费者在轮询所有线程后,清除该标志并调用Object.wait()

另一个选择是Java6 +

BlockingDeque实现类:

 import java.lang.ref.WeakReference; import java.util.WeakHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicLong; class GoChannelPool { private final static GoChannelPool defaultInstance = newPool(); private final AtomicLong serialNumber = new AtomicLong(); private final WeakHashMap> channelWeakHashMap = new WeakHashMap<>(); private final LinkedBlockingDeque totalQueue = new LinkedBlockingDeque<>(); public  GoChannel newChannel() { GoChannel channel = new GoChannel<>(); channelWeakHashMap.put(channel.getId(), new WeakReference(channel)); return channel; } public void select(GoSelectConsumer consumer) throws InterruptedException { consumer.accept(getTotalQueue().take()); } public int size() { return getTotalQueue().size(); } public int getChannelCount() { return channelWeakHashMap.values().size(); } private LinkedBlockingDeque getTotalQueue() { return totalQueue; } public static GoChannelPool getDefaultInstance() { return defaultInstance; } public static GoChannelPool newPool() { return new GoChannelPool(); } private GoChannelPool() {} private long getSerialNumber() { return serialNumber.getAndIncrement(); } private synchronized void syncTakeAndDispatchObject() throws InterruptedException { select(new GoSelectConsumer() { @Override void accept(GoChannelObject t) { WeakReference goChannelWeakReference = channelWeakHashMap.get(t.channel_id); GoChannel channel = goChannelWeakReference != null ? goChannelWeakReference.get() : null; if (channel != null) { channel.offerBuffer(t); } } }); } class GoChannel { // Instance private final long id; private final LinkedBlockingDeque> buffer = new LinkedBlockingDeque<>(); public GoChannel() { this(getSerialNumber()); } private GoChannel(long id) { this.id = id; } public long getId() { return id; } public E take() throws InterruptedException { GoChannelObject object; while((object = pollBuffer()) == null) { syncTakeAndDispatchObject(); } return (E) object.data; } public void offer(E object) { GoChannelObject e = new GoChannelObject(); e.channel_id = getId(); e.data = object; getTotalQueue().offer(e); } protected void offerBuffer(GoChannelObject data) { buffer.offer(data); } protected GoChannelObject pollBuffer() { return buffer.poll(); } public int size() { return buffer.size(); } @Override protected void finalize() throws Throwable { super.finalize(); channelWeakHashMap.remove(getId()); } } class GoChannelObject { long channel_id; E data; boolean belongsTo(GoChannel channel) { return channel != null && channel_id == channel.id; } } abstract static class GoSelectConsumer{ abstract void accept(GoChannelObject t); } } 

然后我们可以这样使用它:

 GoChannelPool pool = GoChannelPool.getDefaultInstance(); final GoChannelPool.GoChannel numberCh = pool.newChannel(); final GoChannelPool.GoChannel stringCh = pool.newChannel(); final GoChannelPool.GoChannel otherCh = pool.newChannel(); ExecutorService executorService = Executors.newCachedThreadPool(); int times; times = 2000; final CountDownLatch countDownLatch = new CountDownLatch(times * 2); final AtomicInteger numTimes = new AtomicInteger(); final AtomicInteger strTimes = new AtomicInteger(); final AtomicInteger defaultTimes = new AtomicInteger(); final int finalTimes = times; executorService.submit(new Runnable() { @Override public void run() { for (int i = 0; i < finalTimes; i++) { numberCh.offer(i); try { Thread.sleep((long) (Math.random() * 10)); } catch (InterruptedException e) { e.printStackTrace(); } } } }); executorService.submit(new Runnable() { @Override public void run() { for (int i = 0; i < finalTimes; i++) { stringCh.offer("s"+i+"e"); try { Thread.sleep((long) (Math.random() * 10)); } catch (InterruptedException e) { e.printStackTrace(); } } } }); int otherTimes = 3; for (int i = 0; i < otherTimes; i++) { otherCh.offer("a"+i); } for (int i = 0; i < times*2 + otherTimes; i++) { pool.select(new GoChannelPool.GoSelectConsumer() { @Override void accept(GoChannelPool.GoChannelObject t) { // The data order should be randomized. System.out.println(t.data); countDownLatch.countDown(); if (t.belongsTo(stringCh)) { strTimes.incrementAndGet(); return; } else if (t.belongsTo(numberCh)) { numTimes.incrementAndGet(); return; } defaultTimes.incrementAndGet(); } }); } countDownLatch.await(10, TimeUnit.SECONDS); /** The console output of data should be randomized. numTimes.get() should be 2000 strTimes.get() should be 2000 defaultTimes.get() should be 3 */ 

并注意select仅在通道属于同一个GoChannelPool时工作,或者只使用默认的GoChannelPool(但是如果太多通道共享同一个GoChannelPool,性能会降低)