在Consumer和Producer Threads中等待并通知

刚开始学习multithreading。 我有多个线程的5个生产者和2个消费者。 基本上这个程序会在队列中添加100个项目。 当队列大小为100时,生产者将停止添加。我希望消费者在消费者从队列中删除所有项目时通知生产者,以便生产者可以再次开始添加。 目前生产者将等待,但永远不会得到消费者的通知。

制片人:

public class Producer implements Runnable { private BlockingQueue sharedQueue; private final int queueSize; private Object lock = new Object(); public Producer(BlockingQueue sharedQueue, int queueSize){ this.sharedQueue = sharedQueue; this.queueSize = queueSize; } public void run() { while(true) { if(sharedQueue.size()== queueSize){ try { synchronized (lock) { sharedQueue.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } try { sharedQueue.put("Producer: " + sharedQueue.size()); Thread.sleep(500); System.out.println("Producer: Queue Size " + sharedQueue.size() + " Current Thread " + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } } } } 

消费者:

 public class Consumer implements Runnable{ private BlockingQueue sharedQueue; private final int queueSize; private final int queueEmpty=0; private Object lock = new Object(); public Consumer(BlockingQueue sharedQueue, int queueSize){ this.sharedQueue = sharedQueue; this.queueSize = queueSize; } //Notify awaiting thread if the sharedQueue is empty public void run() { while (true) { if(sharedQueue.size()==queueEmpty){ synchronized (lock) { this.notifyAll(); } } try { sharedQueue.take(); Thread.sleep(800); System.out.println("Consumer: Queue Size " + sharedQueue.size() + " Current Thread" + Thread.currentThread()); }catch(InterruptedException e){ e.printStackTrace(); } } } } 

主要课程

  public class App{ //A simple program to illustrate how producer and consumer pattern works with blocking queue using executor service public static void main( String[] args ) { final BlockingQueue sharedQueue = new ArrayBlockingQueue (100); final int queueSize =100; final int producerNum = 5; final int consumerNum = 2; final ExecutorService executorProducer = Executors.newFixedThreadPool(producerNum); final ExecutorService executorConsumer = Executors.newFixedThreadPool(consumerNum); for(int i=0;i<producerNum;i++){ Producer producer = new Producer(sharedQueue,queueSize); executorProducer.execute(producer); } for(int j=0;j<consumerNum;j++){ Consumer consumer = new Consumer(sharedQueue,queueSize); executorConsumer.execute(consumer); } } } 

从oracle文档页面 :

BlockingQueue实现是线程安全的。 所有排队方法都使用内部锁或其他forms的并发控制以primefaces方式实现其效果

由于您已经在使用BlockingQueues ,因此您可以删除wait()notify() API。

使用BlockingQueue多个生产者和消费者的示例代码:

 import java.util.concurrent.*; public class ProducerConsumerDemo { public static void main(String args[]){ BlockingQueue sharedQueue = new LinkedBlockingQueue(); Thread prodThread1 = new Thread(new Producer(sharedQueue,1)); Thread prodThread2 = new Thread(new Producer(sharedQueue,2)); Thread consThread1 = new Thread(new Consumer(sharedQueue,1)); Thread consThread2 = new Thread(new Consumer(sharedQueue,2)); prodThread1.start(); prodThread2.start(); consThread1.start(); consThread2.start(); } } class Producer implements Runnable { private final BlockingQueue sharedQueue; private int threadNo; public Producer(BlockingQueue sharedQueue,int threadNo) { this.threadNo = threadNo; this.sharedQueue = sharedQueue; } @Override public void run() { for(int i=1; i<= 5; i++){ try { int number = i+(10*threadNo); System.out.println("Produced:" + number + ":by thread:"+ threadNo); sharedQueue.put(number); } catch (Exception err) { err.printStackTrace(); } } } } class Consumer implements Runnable{ private final BlockingQueue sharedQueue; private int threadNo; public Consumer (BlockingQueue sharedQueue,int threadNo) { this.sharedQueue = sharedQueue; this.threadNo = threadNo; } @Override public void run() { while(true){ try { int num = sharedQueue.take(); System.out.println("Consumed: "+ num + ":by thread:"+threadNo); } catch (Exception err) { err.printStackTrace(); } } } } 

它是如何工作的?

  1. 生产者线程1将整数从11到15放入BlockingQueue
  2. 生产者线程2将整数从21到25放入BlockingQueue
  3. 任何消费者线程 – 线程1或线程2从BlockingQueue读取值(在此示例中为整数)

样本输出:

 Produced:21:by thread:2 Produced:11:by thread:1 Produced:12:by thread:1 Produced:13:by thread:1 Produced:14:by thread:1 Produced:22:by thread:2 Produced:23:by thread:2 Produced:24:by thread:2 Produced:25:by thread:2 Consumed: 21:by thread:1 Consumed: 12:by thread:1 Consumed: 13:by thread:1 Consumed: 14:by thread:1 Consumed: 22:by thread:1 Consumed: 23:by thread:1 Consumed: 24:by thread:1 Consumed: 25:by thread:1 Produced:15:by thread:1 Consumed: 11:by thread:2 Consumed: 15:by thread:1