如何使用信号量解决生产者 – 消费者?

我需要编写类似于生产者 – 消费者的问题,必须使用信号量。 我尝试了几种解决方案,但没有一种能够解决问题。 首先,我在维基百科上尝试了一个解决方案但它没有用。 我目前的代码是这样的:

消费者的方法运行:

public void run() { int i=0; DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); String s = new String(); while (1!=2){ Date datainicio = new Date(); String inicio=dateFormat.format(datainicio); try { Thread.sleep(1000);///10000 } catch (InterruptedException e) { System.out.println("Excecao InterruptedException lancada."); } //this.encheBuffer.down(); this.mutex.down(); // RC i=0; while (i<buffer.length) { if (buffer[i] == null) { i++; } else { break; } } if (i<buffer.length) { QuantidadeBuffer.quantidade--; Date datafim = new Date(); String fim=dateFormat.format(datafim); int identificador; identificador=buffer[i].getIdentificador()[0]; s="Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i; //System.out.println("Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i); buffer[i]= null; } // RC this.mutex.up(); //this.esvaziaBuffer.up(); System.out.println(s); // lock.up(); } } 

生产者的方法运行:

  public void run() { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); int i=0; while (1!=2){ Date datainicio = new Date(); String inicio=dateFormat.format(datainicio); // Produz Item try { Thread.sleep(500);//50000 } catch (InterruptedException e) { System.out.println("Excecao InterruptedException lancada."); } //this.esvaziaBuffer.down(); this.mutex.down(); // RC i=0; while (i<buffer.length) { if (buffer[i]!=null) { i++; } else { break; } } if (i<buffer.length) { int identificador[]=new int[Pedido.getTamanho_identificador()]; identificador[0]=i; buffer[i]=new Pedido(); Produtor.buffer[i].setIdentificador(identificador); Produtor.buffer[i].setTexto("pacote de dados"); QuantidadeBuffer.quantidade++; Date datafim = new Date(); String fim=dateFormat.format(datafim); System.out.println("Produtor Thread: "+Thread.currentThread()+" Pedido: "+identificador[0]+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i); i++; } // RC this.mutex.up(); //this.encheBuffer.up(); } //this.encheBuffer.up(); } 

在上面的代码中,它发生了一个消费者线程来读取一个位置然后,另一个线程读取相同的位置而没有生成器填充该位置,如下所示:

 Consumidor Thread: Thread[Thread-17,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1 Consumidor Thread: Thread[Thread-19,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1 

看来你使用的是互斥量而不是信号量?

在使用互斥锁时,您只有二进制同步 – 锁定和解锁一个资源。 Sempahores具有您可以发出信号或获得的价值。

您正试图锁定/解锁整个缓冲区,但这是错误的方法,因为正如您所看到的那样,生产者或消费者锁定,当读者锁定它时,生产者无法填充缓冲区(因为它必须首先锁定)。

您应该创建一个Sempahore,然后当生产者写入一个数据包或数据块时,它可以发信号通知信号量。 然后,消费者可以尝试获取信号量,这样他们就会等待,直到生产者发信号通知已写入数据包。 在发信号通知写入的数据包时,其中一个消费者将被唤醒并且它将知道它可以读取一个数据包。 它可以读取数据包,然后返回尝试获取信号量。 如果在那个时候生产者已经写了另一个数据包,它再次发出信号,然后其中一个消费者继续读取另一个数据包。 等等…

例如:

(制作人) – 写一个数据包 – Semaphore.release(1)

(消费者xN) – Semaphore.acquire(1) – 读取一个数据包

如果您有多个消费者,那么消费者 (而不是生产者)应该在读取数据包时锁定缓冲区(但在获取信号量时应该)以防止竞争条件。 在下面的示例中,生产者还会锁定列表,因为所有内容都在同一个JVM上。

 import java.util.LinkedList; import java.util.concurrent.Semaphore; public class Semaphores { static Object LOCK = new Object(); static LinkedList list = new LinkedList(); static Semaphore sem = new Semaphore(0); static Semaphore mutex = new Semaphore(1); static class Consumer extends Thread { String name; public Consumer(String name) { this.name = name; } public void run() { try { while (true) { sem.acquire(1); mutex.acquire(); System.out.println("Consumer \""+name+"\" read: "+list.removeFirst()); mutex.release(); } } catch (Exception x) { x.printStackTrace(); } } } static class Producer extends Thread { public void run() { try { int N = 0; while (true) { mutex.acquire(); list.add(new Integer(N++)); mutex.release(); sem.release(1); Thread.sleep(500); } } catch (Exception x) { x.printStackTrace(); } } } public static void main(String [] args) { new Producer().start(); new Consumer("Alice").start(); new Consumer("Bob").start(); } } 
 import java.util.ArrayList; import java.util.List; import java.util.concurrent.Semaphore; import java.util.logging.Level; import java.util.logging.Logger; /* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ /** * * @author sakshi */ public class SemaphoreDemo { static Semaphore producer = new Semaphore(1); static Semaphore consumer = new Semaphore(0); static List list = new ArrayList(); static class Producer extends Thread { List list; public Producer(List list) { this.list = list; } public void run() { for (int i = 0; i < 10; i++) { try { producer.acquire(); } catch (InterruptedException ex) { Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex); } System.out.println("produce=" + i); list.add(i); consumer.release(); } } } static class Consumer extends Thread { List list; public Consumer(List list) { this.list = list; } public void run() { for (int i = 0; i < 10; i++) { try { consumer.acquire(); } catch (InterruptedException ex) { Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex); } System.out.println("consume=" + list.get(i)); producer.release(); } } } public static void main(String[] args) { Producer produce = new Producer(list); Consumer consume = new Consumer(list); produce.start(); consume.start(); } } output: produce=0 consume=0 produce=1 consume=1 produce=2 consume=2 produce=3 consume=3 produce=4 consume=4 produce=5 consume=5 produce=6 consume=6 produce=7 consume=7 produce=8 consume=8 produce=9 consume=9 
 import java.util.concurrent.Semaphore; public class ConsumerProducer{ public static void main(String[] args) { Semaphore semaphoreProducer=new Semaphore(1); Semaphore semaphoreConsumer=new Semaphore(0); System.out.println("semaphoreProducer permit=1 | semaphoreConsumer permit=0"); new Producer(semaphoreProducer,semaphoreConsumer).start(); new Consumer(semaphoreConsumer,semaphoreProducer).start(); } /** * Producer Class. */ static class Producer extends Thread{ Semaphore semaphoreProducer; Semaphore semaphoreConsumer; public Producer(Semaphore semaphoreProducer,Semaphore semaphoreConsumer) { this.semaphoreProducer=semaphoreProducer; this.semaphoreConsumer=semaphoreConsumer; } public void run() { for(;;){ try { semaphoreProducer.acquire(); System.out.println("Produced : "+Thread.currentThread().getName()); semaphoreConsumer.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * Consumer Class. */ static class Consumer extends Thread{ Semaphore semaphoreConsumer; Semaphore semaphoreProducer; public Consumer(Semaphore semaphoreConsumer,Semaphore semaphoreProducer) { this.semaphoreConsumer=semaphoreConsumer; this.semaphoreProducer=semaphoreProducer; } public void run() { for(;;){ try { semaphoreConsumer.acquire(); System.out.println("Consumed : "+Thread.currentThread().getName()); semaphoreProducer.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }