生产者 – 消费; 消费者如何停止?

所以我模拟了我的生产者消费者问题,我有下面的代码。 我的问题是:如果消费者处于不变的状态,消费者将如何停止(真实)。

在下面的代码中,我添加了

if (queue.peek()==null) Thread.currentThread().interrupt(); 

在这个例子中很好用。 但是在我的真实世界设计中,这不起作用(有时生产者需要更长的时间来“放置”数据,因此消费者抛出的exception是不正确的。一般来说,我知道我可以放一个’毒’数据比如Object是XYZ,我可以在消费者中检查它。但这种毒药使代码看起来真的很糟糕。想知道是否有人采用不同的方法。

 public class ConsumerThread implements Runnable { private BlockingQueue queue; private String name; private boolean isFirstTimeConsuming = true; public ConsumerThread(String name, BlockingQueue queue) { this.queue=queue; this.name=name; } @Override public void run() { try { while (true) { if (isFirstTimeConsuming) { System.out.println(name+" is initilizing..."); Thread.sleep(4000); isFirstTimeConsuming=false; } try{ if (queue.peek()==null) Thread.currentThread().interrupt(); Integer data = queue.take(); System.out.println(name+" consumed ------->"+data); Thread.sleep(70); }catch(InterruptedException ie) { System.out.println("InterruptedException!!!!"); break; } } System.out.println("Comsumer " + this.name + " finished its job; terminating."); }catch (InterruptedException e) { e.printStackTrace(); } } 

}

答:根本不能保证只是因为peek返回null ,生产者已停止生产。 如果制作人放慢速度怎么办? 现在,消费者退出,生产者继续生产。 所以’偷看’ – >’rest’的想法基本上都失败了。

B:在消费者中设置’done / run’标志并在生产者中读取它也会失败,如果:

  1. 消费者检查旗帜,发现它应该继续运行,然后做“拿”
  2. 与此同时,制片人将旗帜设置为“不要跑”
  3. 现在消费者永远等待鬼包

相反的情况也可能发生,并且一个数据包被遗漏掉了。

然后,为了解决这个问题,你需要在’BlockingQueue’之上与互斥锁进行额外的同步。

C:我觉得’Rosetta Code’是决定什么是好习惯的好来源,在这种情况下:

http://rosettacode.org/wiki/Synchronous_concurrency#Java

生产者和消费者必须就表示输入结束的对象(或对象中的属性)达成一致。 然后生产者在最后一个数据包中设置该属性,并且消费者停止消费它。 即你在你的问题中提到的“毒药”。

在上面的Rosetta Code示例中,这个’对象’只是一个名为’EOF’的空String

 final String EOF = new String(); // Producer while ((line = br.readLine()) != null) queue.put(line); br.close(); // signal end of input queue.put(EOF); // Consumer while (true) { try { String line = queue.take(); // Reference equality if (line == EOF) break; System.out.println(line); linesWrote++; } catch (InterruptedException ie) { } } 

不要在Thread上使用中断 ,而是在不再需要时断开循环:

 if (queue.peek()==null) break; 

或者您也可以使用变量来标记关闭操作挂起,然后在以下情况之后断开循环并关闭循环:

 if (queue.peek()==null) closing = true; //Do further operations ... if(closing) break; 

在现实世界中,大多数消息传递都带有某种类型的头,它定义了消息类型/子类型或者可能是不同的对象。

您可以创建一个命令和控制对象或消息类型,告诉线程在获取消息时执行某些操作(如关闭,重新加载表,添加新的侦听器等)。

这样,您可以说命令和控制线程只是将消息发送到正常的消息流中。 您可以让CNC线程与大型系统中的操作终端通信等。

如果您的队列在您希望消费者终止之前可以清空,那么您需要一个标志来告诉线程何时停止。 添加setter方法,以便生产者可以告诉使用者关闭。 然后修改您的代码,而不是:

 if (queue.isEmpty()) break; 

检查你的代码

 if (!run) { break; } else if (queue.isEmpty()) { Thread.sleep(200); continue; }