多生产者多消费者multithreadingJava

我正在尝试多生产者 – 生产者 – 消费者问题的多个消费者使用案例。 我正在使用BlockingQueue在多个生产者/消费者之间共享公共队列。

以下是我的代码。
制片人

import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { private BlockingQueue inputQueue; private static volatile int i = 0; private volatile boolean isRunning = true; public Producer(BlockingQueue q){ this.inputQueue=q; } public synchronized void run() { //produce messages for(i=0; i<10; i++) { try { inputQueue.put(new Integer(i)); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Produced "+i); } finish(); } public void finish() { //you can also clear here if you wanted isRunning = false; } } 

消费者

 import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { private BlockingQueue inputQueue; private volatile boolean isRunning = true; private final Integer POISON_PILL = new Integer(-1); Consumer(BlockingQueue queue) { this.inputQueue = queue; } public void run() { //worker loop keeps taking en element from the queue as long as the producer is still running or as //long as the queue is not empty: while(!inputQueue.isEmpty()) { try { Integer queueElement = (Integer) inputQueue.take(); System.out.println("Consumed : " + queueElement.toString()); } catch (Exception e) { e.printStackTrace(); } } System.out.println("Queue "); } //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void finish() { //you can also clear here if you wanted isRunning = false; inputQueue.add(POISON_PILL); } } 

测试类

 import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ProducerConsumerService { public static void main(String[] args) { //Creating BlockingQueue of size 10 BlockingQueue queue = new ArrayBlockingQueue(10); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); //starting producer to produce messages in queue new Thread(producer).start(); //starting producer to produce messages in queue new Thread(producer).start(); //starting consumer to consume messages from queue new Thread(consumer).start(); //starting consumer to consume messages from queue new Thread(consumer).start(); System.out.println("Producer and Consumer has been started"); } } 

运行以下代码时,我看不到正确的输出。

我在这里做错了吗?

有很多你的代码没有意义。 我建议你坐下来弄清楚为什么代码在那里以及它在做什么。

如果删除了isFinshed标志,则不会发生任何变化。

如果您在生产者中删除了synchronized的使用,那么您将拥有并发生产者。 创建仅在同步块volatile中访问的字段没有任何好处。

对于生产者来说,共享循环计数器是没有意义的,如果它们要并发的话。 通常,生产者发送毒丸,而消费者不会消费药丸。 例如,如果你有两个消费者,一个可能会添加药丸,另一个可能会消耗它。 你的消费者会忽略毒丸,因为它会忽略isFinished标志。

您不希望仅因为队列暂时为空而停止使用者。 否则它将不会看到生产者产生的所有消息,可能没有。

包含多个生产者和多个消费者的示例代码

 import java.util.concurrent.*; public class ProducerConsumerDemo { public static void main(String args[]){ BlockingQueue sharedQueue = new LinkedBlockingQueue(); Thread prodThread1 = new Thread(new Producer(sharedQueue,1)); Thread prodThread2 = new Thread(new Producer(sharedQueue,2)); Thread consThread1 = new Thread(new Consumer(sharedQueue,1)); Thread consThread2 = new Thread(new Consumer(sharedQueue,2)); prodThread1.start(); prodThread2.start(); consThread1.start(); consThread2.start(); } } class Producer implements Runnable { private final BlockingQueue sharedQueue; private int threadNo; public Producer(BlockingQueue sharedQueue,int threadNo) { this.threadNo = threadNo; this.sharedQueue = sharedQueue; } @Override public void run() { for(int i=1; i<= 5; i++){ try { int number = i+(10*threadNo); System.out.println("Produced:" + number + ":by thread:"+ threadNo); sharedQueue.put(number); } catch (Exception err) { err.printStackTrace(); } } } } class Consumer implements Runnable{ private final BlockingQueue sharedQueue; private int threadNo; public Consumer (BlockingQueue sharedQueue,int threadNo) { this.sharedQueue = sharedQueue; this.threadNo = threadNo; } @Override public void run() { while(true){ try { int num = sharedQueue.take(); System.out.println("Consumed: "+ num + ":by thread:"+threadNo); } catch (Exception err) { err.printStackTrace(); } } } } 

输出:

 Produced:11:by thread:1 Produced:21:by thread:2 Produced:22:by thread:2 Produced:23:by thread:2 Produced:24:by thread:2 Produced:25:by thread:2 Consumed: 11:by thread:1 Consumed: 22:by thread:1 Consumed: 23:by thread:1 Consumed: 24:by thread:1 Consumed: 25:by thread:1 Produced:12:by thread:1 Consumed: 21:by thread:2 Consumed: 12:by thread:1 Produced:13:by thread:1 Produced:14:by thread:1 Produced:15:by thread:1 Consumed: 13:by thread:2 Consumed: 14:by thread:1 Consumed: 15:by thread:2 

本文提供了BlockingQueue的简单示例生产者和消费者问题

您的代码更改:

  1. 不同的生产者将产生不同的输出而不是相同的输出 生产者线程1生成11-15的数字,生产者线程2生成21-25的数字
  2. 任何Consumer线程都可以使用任何Producer的数据。 与生产者不同,消费者没有使用数据的约束。
  3. 我在Producer和Consumer中都添加了Thread号。

您可以在以下位置找到ExecutorService替代解决方案:

使用队列的生产者/消费者线程

直接实施它并不太难。 下面的示例代码就是这样做的。 它只是将本地变量用于不应该共享的所有内容。

除队列外,只共享一个保持活动生成器数量的线程安全计数器。 使用计数器而不是特殊的“ POISON_PILL ”值,因为这样的标记值不适用于单个队列和多个消费者,因为所有消费者必须识别生产者的完成但仅在所有生产者完成时。

计数器是一个简单的结束条件。 唯一需要注意的是,在检测到计数器达到零之后,必须重新检查队列以避免竞争条件。

作为旁注,使用Java 5提供的并发function并且不使用Generics来实现干净的类型安全代码是没有意义的。

 final AtomicInteger activeProducers=new AtomicInteger(); final BlockingQueue queue=new ArrayBlockingQueue<>(10); Runnable producer=new Runnable() { public void run() { try { for(int i=0; i<10; i++) { Thread.sleep(TimeUnit.SECONDS.toMillis(1)); queue.put(i); System.out.println("Produced "+i); } } catch(InterruptedException ex) { System.err.println("producer terminates early: "+ex); } finally { activeProducers.decrementAndGet(); } } }; Runnable consumer=new Runnable() { public void run() { try { for(;;) { Integer queueElement = queue.poll(1, TimeUnit.SECONDS); if(queueElement!=null) System.out.println("Consumed : " + queueElement); else if(activeProducers.get()==0 && queue.peek()==null) return; } } catch(InterruptedException ex) { System.err.println("consumer terminates early: "+ex); } } }; final int NUM_PRODUCERS = 2, NUM_CONSUMERS = 2; for(int i=0; i