生产者 – Java中的消费者multithreading

我想在Java中使用multithreading等待和通知方法编写程序。
该程序有一个堆栈(max-length = 5)。 生产者永远生成数字并将其放入堆栈中,消费者从堆栈中选择它。

当堆栈已满时,生产者必须等待,当堆栈为空时,消费者必须等待。
问题是它只运行一次,我的意思是一旦它产生5个数字就会停止,但是我将run方法放入while(true)块以运行不间断但它没有。
这是我到目前为止所尝试的。
制片人类:

package trail; import java.util.Random; import java.util.Stack; public class Thread1 implements Runnable { int result; Random rand = new Random(); Stack A = new Stack(); public Thread1(Stack A) { this.A = A; } public synchronized void produce() { while (A.size() >= 5) { System.out.println("List is Full"); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } result = rand.nextInt(10); System.out.println(result + " produced "); A.push(result); System.out.println(A); this.notify(); } @Override public void run() { System.out.println("Producer get started"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { produce(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } 

和消费者:

 package trail; import java.util.Stack; public class Thread2 implements Runnable { Stack A = new Stack(); public Thread2(Stack A) { this.A = A; } public synchronized void consume() { while (A.isEmpty()) { System.err.println("List is empty" + A + A.size()); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.err.println(A.pop() + " Consumed " + A); this.notify(); } @Override public void run() { System.out.println("New consumer get started"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { consume(); } } } 

这是主要方法:

 public static void main(String[] args) { Stack stack = new Stack(); Thread1 thread1 = new Thread1(stack);// p Thread2 thread2 = new Thread2(stack);// c Thread A = new Thread(thread1); Thread B = new Thread(thread2); Thread C = new Thread(thread2); A.start(); B.start(); C.start(); } 

我认为如果你试图分离目前混合的三件事情,那么理解和处理同步会更好:

  1. 任务即将完成实际工作。 Thread2Thread2类的名称具有误导性。 它们不是Thread对象,但实际上它们是实现您为Thread对象提供的Runnable接口的作业或任务。

  2. 您在main中创建的线程对象本身

  3. 共享对象,它封装了队列,堆栈等上的同步操作/逻辑。此对象将在任务之间共享。 在这个共享对象中,您将负责添加/删除操作(使用synchronized块或同步方法)。 目前(正如已经指出的那样),同步是在任务本身上完成的(即每个任务等待并通知其自身锁定并且没有任何反应)。 当你分开关注点,即让一个class级做一件事情时,最终会清楚问题出在哪里。

您的使用者和您的生产者在不同的对象上同步,并且不会相互阻塞。 如果这样有效,我敢说这是偶然的。

阅读java.util.concurrent.BlockingQueuejava.util.concurrent.ArrayBlockingQueue 。 这些为您提供了更现代,更简单的方法来实现此模式。

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

您应该在堆栈上进行同步而不是将其放在方法级别尝试此代码。

也不要在你的线程类中初始化堆栈,无论你是在主类的构造函数中传递它们,所以不需要那样做。

始终尽量避免使用synchronized关键字标记任何方法而不是尝试将关键部分代码放入同步块中,因为同步区域的大小越大,它将对性能产生影响。

因此,始终只将该代码放入需要线程安全的同步块中。

制片人代码:

 public void produce() { synchronized (A) { while (A.size() >= 5) { System.out.println("List is Full"); try { A.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } result = rand.nextInt(10); System.out.println(result + " produced "); A.push(result); System.out.println("stack ---"+A); A.notifyAll(); } } 

消费者代码:

 public void consume() { synchronized (A) { while (A.isEmpty()) { System.err.println("List is empty" + A + A.size()); try { System.err.println("wait"); A.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.err.println(A.pop() + " Consumed " + A); A.notifyAll(); } } 

尝试这个:

 import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class CircularArrayQueue { private volatile Lock rwLock = new ReentrantLock(); private volatile Condition emptyCond = rwLock.newCondition(); private volatile Condition fullCond = rwLock.newCondition(); private final int size; private final Object[] buffer; private volatile int front; private volatile int rare; /** * @param size */ public CircularArrayQueue(int size) { this.size = size; this.buffer = new Object[size]; this.front = -1; this.rare = -1; } public boolean isEmpty(){ return front == -1; } public boolean isFull(){ return (front == 0 && rare == size-1) || (front == rare + 1); } public void enqueue(T item){ try { // get a write lock rwLock.lock(); // if the Q is full, wait the write lock if(isFull()) fullCond.await(); if(rare == -1){ rare = 0; front = 0; } else if(rare == size - 1){ rare = 0; } else { rare ++; } buffer[rare] = item; //System.out.println("Added\t: " + item); // notify the reader emptyCond.signal(); } catch(InterruptedException e){ e.printStackTrace(); } finally { // unlock the write lock rwLock.unlock(); } } public T dequeue(){ T item = null; try{ // get the read lock rwLock.lock(); // if the Q is empty, wait the read lock if(isEmpty()) emptyCond.await(); item = (T)buffer[front]; //System.out.println("Deleted\t: " + item); if(front == rare){ front = rare = -1; } else if(front == size - 1){ front = 0; } else { front ++; } // notify the writer fullCond.signal(); } catch (InterruptedException e){ e.printStackTrace(); } finally{ // unlock read lock rwLock.unlock(); } return item; } } 

您可以使用Java的awesome java.util.concurrent包及其类。

您可以使用BlockingQueue轻松实现生产者消费者问题。 BlockingQueue已经支持在检索元素时等待队列变为非空的操作,并且在存储元素时等待队列中的空间可用。

如果没有BlockingQueue ,每次我们将数据放入生产者端的队列时,我们需要检查队列是否已满,如果已满,请等待一段时间,再次检查并继续。 同样在消费者方面,我们必须检查队列是否为空,如果为空,则等待一段时间,再次检查并继续。 但是对于BlockingQueue我们不必编写任何额外的逻辑,只需添加Producer中的数据并从Consumer中轮询数据。

阅读更多来自:

http://javawithswaranga.blogspot.in/2012/05/solving-producer-consumer-problem-in.html

http://www.javajee.com/producer-consumer-problem-in-java-using-blockingqueue

使用BlockingQueue,LinkedBlockingQueue这非常简单。 http://developer.android.com/reference/java/util/concurrent/BlockingQueue.html

 package javaapplication; import java.util.Stack; import java.util.logging.Level; import java.util.logging.Logger; public class ProducerConsumer { public static Object lock = new Object(); public static Stack stack = new Stack(); public static void main(String[] args) { Thread producer = new Thread(new Runnable() { int i = 0; @Override public void run() { do { synchronized (lock) { while (stack.size() >= 5) { try { lock.wait(); } catch (InterruptedException e) { } } stack.push(++i); if (stack.size() >= 5) { System.out.println("Released lock by producer"); lock.notify(); } } } while (true); } }); Thread consumer = new Thread(new Runnable() { @Override public void run() { do { synchronized (lock) { while (stack.empty()) { try { lock.wait(); } catch (InterruptedException ex) { Logger.getLogger(ProdCons1.class.getName()).log(Level.SEVERE, null, ex); } } while(!stack.isEmpty()){ System.out.println("stack : " + stack.pop()); } lock.notifyAll(); } } while (true); } }); producer.start(); consumer.start(); } } 

看看这个代码示例:

 import java.util.concurrent.*; import java.util.Random; public class ProducerConsumerMulti { public static void main(String args[]){ BlockingQueue sharedQueue = new LinkedBlockingQueue(); Thread prodThread = new Thread(new Producer(sharedQueue,1)); Thread consThread1 = new Thread(new Consumer(sharedQueue,1)); Thread consThread2 = new Thread(new Consumer(sharedQueue,2)); prodThread.start(); consThread1.start(); consThread2.start(); } } class Producer implements Runnable { private final BlockingQueue sharedQueue; private int threadNo; private Random rng; public Producer(BlockingQueue sharedQueue,int threadNo) { this.threadNo = threadNo; this.sharedQueue = sharedQueue; this.rng = new Random(); } @Override public void run() { while(true){ try { int number = rng.nextInt(100); System.out.println("Produced:" + number + ":by thread:"+ threadNo); sharedQueue.put(number); Thread.sleep(100); } catch (Exception err) { err.printStackTrace(); } } } } class Consumer implements Runnable{ private final BlockingQueue sharedQueue; private int threadNo; public Consumer (BlockingQueue sharedQueue,int threadNo) { this.sharedQueue = sharedQueue; this.threadNo = threadNo; } @Override public void run() { while(true){ try { int num = sharedQueue.take(); System.out.println("Consumed: "+ num + ":by thread:"+threadNo); Thread.sleep(100); } catch (Exception err) { err.printStackTrace(); } } } } 

笔记:

  1. 根据您的问题陈述启动了一个Producer和两个Consumers
  2. Producer将在无限循环中产生0到100之间的随机数
  3. Consumer将在无限循环中消耗这些数字
  4. ProducerConsumer共享锁定免费和线程安全LinkedBlockingQueue这是线程安全。 如果使用这些高级并发结构,则可以删除wait()和notify()方法。

好像你跳过了关于wait()notify()synchronized 。 看到这个例子 ,它应该对你有帮助。