流API和队列:订阅BlockingQueue流式

假设我们有一个队列

BlockingQueue queue= new LinkedBlockingQueue(); 

而其他一些线程将值放入其中,然后我们就像读取它一样

 while (true) { String next = queue.take(); System.out.println("next message:" + next); } 

如何以流样式迭代此队列,同时保持与上述代码类似的语义。

此代码仅遍历当前队列状态:

 queue.stream().forEach(e -> System.out.println(e)); 

我猜你有点期待,但我觉得我有一个很好的预感。

队列流(如遍历队列)表示队列的当前内容 。 当迭代器或流到达队列的尾部时,它不会阻止等待添加的其他元素。 迭代器或流在该点耗尽,计算终止。

如果您想要一个包含队列的所有当前和未来元素的流,您可以执行以下操作:

 Stream.generate(() -> { try { return queue.take(); } catch (InterruptedException ie) { return "Interrupted!"; } }) .filter(s -> s.endsWith("x")) .forEach(System.out::println); 

(不幸的是,处理InterruptedException的需要使得它非常混乱。)

请注意,无法关闭队列,并且Stream.generate无法停止生成元素,因此这实际上是无限流。 终止它的唯一方法是使用短路流操作,例如findFirst

您可以查看异步队列实现。 如果你有Java 8,那么独眼巨人反应 ,我是这个项目的开发人员,提供了一个async.Queue,它允许你同步地(并且干净地)填充和使用队列。

例如

 Queue queue = QueueFactories.unboundedQueue().build(); 

或者简单地说(只要这是一个com.aol.simple.react.async.Queue)

 Queue queue = new Queue<>(); 

然后在一个单独的线程中:

 new Thread(() -> { while (true) { queue.add("New message " + System.currentTimeMillis()); } }).start(); 

回到主线程,您的原始代码现在应该按预期工作(不经意地迭代添加到队列中的消息并打印出来)

 queue.stream().forEach(e -> System.out.println(e)); 

Queue和Stream可以在任何阶段关闭 –

 queue.close(); 

另一种方法是构建自定义Spliterator。 在我的情况下,我有一个阻塞队列,我想构建一个继续提取元素的流,直到块超时。 分裂器是这样的:

 public class QueueSpliterator implements Spliterator { private final BlockingQueue queue; private final long timeoutMs; public QueueSpliterator(final BlockingQueue queue, final long timeoutMs) { this.queue = queue; this.timeoutMs = timeoutMs; } @Override public int characteristics() { return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED; } @Override public long estimateSize() { return Long.MAX_VALUE; } @Override public boolean tryAdvance(final Consumer action) { try { final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS); if (next == null) { return false; } action.accept(next); return true; } catch (final InterruptedException e) { throw new SupplierErrorException("interrupted", e); } } @Override public Spliterator trySplit() { return null; } } 

处理InterruptedException抛出的exception是RuntimeException的扩展。 使用此类,可以通过以下方式构建流:StreamSupport.stream(new QueueSpliterator(…))并添加常用的流操作。