生产者消费者程序在Java中使用wait()和notify()

我正在使用低级同步和wait()和notify()在Java中执行经典的Producer-Consumer问题。 我知道有更好的实现使用java.util.concurrent包中的结构,但我的问题围绕低级实现:

private static ArrayList list = new ArrayList(); static Object obj = new Object(); public static void producer() throws InterruptedException { synchronized (obj) { while (true) { if (list.size() == 10) { System.out.println("Queue full.. Waiting to Add"); obj.wait(); } else { int value = new Random().nextInt(100); if (value <= 10) { Thread.sleep(200); System.out.println("The element added was : " + value); list.add(value); obj.notify(); } } } } } public static void consumer() throws InterruptedException { synchronized (obj) { while (true) { Thread.sleep(500); if (list.size() == 0) { System.out.println("Queue is empty...Waiting to remove"); obj.wait(); } else { System.out.println("The element removed was : " + list.remove(0)); obj.notify(); } } } } 

程序中有2个线程,具体为生产者和消费者各1个。 代码工作得很好。

唯一的问题是生产者继续生成消息,直到达到最大值(直到列表的大小为10),消费者一次消耗所有10个消息。

如何让生产者和消费者同时工作?

这是示例输出:

 The element added was : 4 The element added was : 0 The element added was : 0 The element added was : 4 The element added was : 3 The element added was : 1 The element added was : 10 The element added was : 10 The element added was : 3 The element added was : 9 Queue full.. Waiting to Add The element removed was : 4 The element removed was : 0 The element removed was : 0 The element removed was : 4 The element removed was : 3 The element removed was : 1 The element removed was : 10 The element removed was : 10 The element removed was : 3 The element removed was : 9 Queue is empty...Waiting to remove 

编辑:这是更正后的代码:

 private static ArrayList list = new ArrayList(); private static Object obj = new Object(); public static void producer() throws InterruptedException { while (true) { Thread.sleep(500); if (list.size() == 10) { System.out.println("Waiting to add"); synchronized (obj) { obj.wait(); } } synchronized (obj) { int value = new Random().nextInt(10); list.add(value); System.out.println("Added to list: " + value); obj.notify(); } } } public static void consumer() throws InterruptedException { while (true) { Thread.sleep(500); if (list.size() == 0) { System.out.println("Waiting to remove"); synchronized (obj) { obj.wait(); } } synchronized (obj) { int removed = list.remove(0); System.out.println("Removed from list: " + removed); obj.notify(); } } } 

您不能在同步块中使用同一对象运行两个线程。 当一个方法运行时,另一个方法无法运行,直到另一个线程调用wait方法。

要解决此问题,您应该在synchronized块中addremove 。 有关更多信息,请参阅此

生产者和消费者问题是多进程同步问题的典型例子。 这描述了共享公共资源缓冲区的两个进程,生产者和消费者。 生产者工作是生成数据并将其放入缓冲区,而消费者作业使用生成的数据并从缓冲区中删除。

生产者必须确保在缓冲区已满时不应添加任何元素,它应调用wait()直到使用者消耗一些数据并notify生产者线程并且消费者必须确保它不应该尝试从缓冲区中删除项目,它应该调用wait() ,它只是等待生产者生成数据并将其添加到缓冲区并使用notifynotifyAll notify消费者。

使用BlockingQueue接口可以解决此问题,该接口管理此生产者和消费者实现自己。

 import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; /* * To change this license header, choose License Headers in Project `Properties`. * To change this template file, choose Tools | Templates * and open the template in the editor. */ /** * * @author sakshi */ public class ThreadProducer { static List list = new ArrayList(); static class Producer implements Runnable { List list; public Producer(List list) { this.list = list; } @Override public void run() { synchronized (list) { for (int i = 0; i < 10; i++) { if (list.size() >= 1) { try { System.out.println("producer is waiting "); list.wait(); } catch (InterruptedException ex) { ex.printStackTrace(); } } System.out.println("produce=" + i); list.add(i); list.notifyAll(); try { Thread.sleep(500); } catch (InterruptedException ex) { ex.printStackTrace(); } } } //To change body of generated methods, choose Tools | Templates. } } static class Consumer implements Runnable { List list; public Consumer(List list) { this.list = list; } @Override public void run() { synchronized (list) { for (int i = 0; i < 10; i++) { while (list.isEmpty()) { System.out.println("Consumer is waiting"); try { list.wait(); } catch (InterruptedException ex) { ex.printStackTrace();; } } int k = list.remove(0); System.out.println("consume=" + k); list.notifyAll(); try { Thread.sleep(500); } catch (InterruptedException ex) { ex.printStackTrace(); } } } } } public static void main(String[] args) { Thread producer = new Thread(new Producer(list)); Thread consumer = new Thread(new Consumer(list)); producer.start(); consumer.start(); } } 

输出:

 produce=0 producer is waiting consume=0 Consumer is waiting produce=1 producer is waiting consume=1 Consumer is waiting produce=2 producer is waiting consume=2 Consumer is waiting produce=3 producer is waiting consume=3 Consumer is waiting produce=4 producer is waiting consume=4 Consumer is waiting produce=5 producer is waiting consume=5 Consumer is waiting produce=6 producer is waiting consume=6 Consumer is waiting produce=7 producer is waiting consume=7 Consumer is waiting produce=8 producer is waiting consume=8 Consumer is waiting produce=9 consume=9 

不要使用list.size() == 10 ,而是可以检查list.size == 1

对于生产者生产,一个人等待消费者消费。 请参阅此生产者消费者问题 – 使用等待和通知的解决方案在Java中

class Resources {

 private final int capacity = 2; public static int value = 0; LinkedList < Integer > list; Resources() { list = new LinkedList < > (); } void consume() throws InterruptedException { while (true) { synchronized(this) { while (list.size() == 0) { wait(); } int val = list.removeFirst(); System.out.println("Value consumed:" + val); notify(); //Thread.sleep(1000); } } } void produce() throws InterruptedException { while (true) { synchronized(this) { while (list.size() == capacity) { wait(); } System.out.println("Value produced:" + value); list.add(value++); notify(); Thread.sleep(1000); } } } 

}

class MyThread5扩展Thread {

 Resources rs; String name; public String getNames() { return name; } public MyThread5(Resources rs, String name) { this.rs = rs; this.name = name; } @Override public void run() { if (this.getNames().equals("Producer")) { try { this.rs.produce(); } catch (InterruptedException ex) { Logger.getLogger(MyThread5.class.getName()).log(Level.SEVERE, null, ex); } } else { try { this.rs.consume(); } catch (InterruptedException ex) { Logger.getLogger(MyThread5.class.getName()).log(Level.SEVERE, null, ex); } } } 

}

public class ProducerConsumerExample {

 public static void main(String[] args) { try { Resources rs = new Resources(); MyThread5 m1 = new MyThread5(rs, "Producer"); MyThread5 m2 = new MyThread5(rs, "Consumer"); m1.start(); m2.start(); m1.join(); m2.join(); } catch (InterruptedException ex) { Logger.getLogger(ProducerConsumerExample.class.getName()).log(Level.SEVERE, null, ex); } } 

}