用于广播的Java并发队列?

我有一堆生产者线程添加到BlockingQueue和一个工作线程获取对象。 现在我想扩展这个,所以两个工作线程正在接受对象,但是对对象做了不同的工作。 这是扭曲: 我想要一个已被放在队列中的对象由两个接收线程处理

如果我继续使用BlockingQueue,则两个线程将竞争对象,并且只有一个工作线程将获得该对象。

所以我正在寻找类似于BlockingQueue的东西,但是有广播行为。

应用程序:生产者线程实际上正在创建性能测量,其中一个工作者正在将测量值写入文件,而另一个工作者正在聚合统计信息。

我正在使用Java 6。

这样的机制是否存在? 在Java SE? 别处? 或者我需要自己编码吗?

我正在寻找占地面​​积小的解决方案 – 我不想安装一些框架​​来做到这一点。

一种选择:有三个阻塞队列。 您的主要制作人将项目放入“广播”队列。 然后,您拥有队列的消费者该消费者使用每个项目,将其放入其他两个队列中,每个队列由一个消费者提供服务:

  Q2 ---- Real Consumer 1 Q1 / Producer ---- Broadcast Consumer \ ---- Real Consumer 2 Q3 

或者,您可以为生产者提供两个阻塞队列,然后让它将它生成的项目放入两者中。 这不太优雅,但总体上稍微简单:)

Jon Skeet的想法很简单。 除此之外,您可以使用破坏程序模式,它更快,并解决了这个问题。 我可以给你一个CoralQueue的代码示例,它是Coral Blocks完成的破坏程序模式的一个实现,我是附属的。 它提供了一个名为Splitter的数据结构,它接受一个提供消息的生产者并接受多个消费者轮询消息,其方式是将所有消息传递给每个消费者

 package com.coralblocks.coralqueue.sample.splitter; import com.coralblocks.coralqueue.splitter.AtomicSplitter; import com.coralblocks.coralqueue.splitter.Splitter; import com.coralblocks.coralqueue.util.Builder; public class Basics { private static final int NUMBER_OF_CONSUMERS = 4; public static void main(String[] args) { Builder builder = new Builder() { @Override public StringBuilder newInstance() { return new StringBuilder(1024); } }; final Splitter splitter = new AtomicSplitter(1024, builder, NUMBER_OF_CONSUMERS); Thread producer = new Thread(new Runnable() { private final StringBuilder getStringBuilder() { StringBuilder sb; while((sb = splitter.nextToDispatch()) == null) { // splitter can be full if the size of the splitter // is small and/or the consumer is too slow // busy spin (you can also use a wait strategy instead) } return sb; } @Override public void run() { StringBuilder sb; while(true) { // the main loop of the thread // (...) do whatever you have to do here... // and whenever you want to send a message to // the other thread you can just do: sb = getStringBuilder(); sb.setLength(0); sb.append("Hello!"); splitter.flush(); // you can also send in batches to increase throughput: sb = getStringBuilder(); sb.setLength(0); sb.append("Hi!"); sb = getStringBuilder(); sb.setLength(0); sb.append("Hi again!"); splitter.flush(); // dispatch the two messages above... } } }, "Producer"); final Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS]; for(int i = 0; i < consumers.length; i++) { final int index = i; consumers[i] = new Thread(new Runnable() { @SuppressWarnings("unused") @Override public void run() { while (true) { // the main loop of the thread // (...) do whatever you have to do here... // and whenever you want to check if the producer // has sent a message you just do: long avail; while((avail = splitter.availableToPoll(index)) == 0) { // splitter can be empty! // busy spin (you can also use a wait strategy instead) } for(int i = 0; i < avail; i++) { StringBuilder sb = splitter.poll(index); // (...) do whatever you want to do with the data // just don't call toString() to create garbage... // copy byte-by-byte instead... } splitter.donePolling(index); } } }, "Consumer" + index); } for(int i = 0; i < consumers.length; i++) { consumers[i].start(); } producer.start(); } } 

免责声明:我是CoralQueue的开发人员之一。