如何并行等待多个阻塞队列?

我有两个独立的阻塞队列。 客户端通常使用第二个阻塞队列中的第一个来检索要处理的元素。

在某些情况下,客户端对两个阻塞队列中的元素感兴趣,无论哪个队列首先提供数据。

客户端如何同时等待这两个队列?

您可以尝试在某种循环中使用poll方法,在轮询另一个队列之前只等待一个队列的指定时间。

除此之外,我会说在不同的线程上为每个队列运行阻塞操作,并为主应用程序提供回调接口是另一个稍微复杂的选项。

我会创建一些聚合队列,将操作委托给实际队列:

 class AggregatedQueue /*implements Queue*/ { final List> queues; final ExecutorService service; final AtomicBoolean shutdown = new AtomicBoolean(false); final BlockingQueue aggregatedQueue = new LinkedBlockingQueue<>(); public AggregatedQueue(List> queues) { this.queues = queues; this.service = Executors.newFixedThreadPool(queues.size()); } public void start() { for (BlockingQueue queue : queues) { service.submit((Runnable) () -> { while (!shutdown.get()) { try { aggregatedQueue.add(queue.take()); } catch (InterruptedException e) { // handle } } }); } } //@Override public E take() throws InterruptedException { return aggregatedQueue.take(); } } 

要使用它,您应该在构造函数中传递队列并调用start方法。 将有1个线程侦听每个队列并将获取的元素传递给聚合队列。

 AggregatedQueue queue = new AggregatedQueue<>(Arrays.asList(q1, q2)); queue.start(); while (listening) { Message msg = queue.take(); dispatch(msg); } queue.shutdown(); 

我遇到了同样的问题,最后编写了自己的并发队列来支持这种使用模式。

由于Java阻塞原语不允许阻塞多个对象,因此解决方案被推送到集合本身: Linked Blocking Multi Queue实际上是由头部连接的一组队列。 元素提供给单个“子队列”,但从单个位置检索(支持阻塞操作)。