正确实现生产者 – 消费者场景和“优雅”终止线程池

我正在开发我的第一个multithreading项目,因此有一些我不确定的事情。 关于我的设置的详细信息是在上一个问题上 ,简而言之:我有一个由Executors.newFixedThreadPool(N)实现的线程池。 一个线程被赋予一个动作,该动作对本地和远程资源执行一系列查询,并迭代地填充ArrayBlockingQueue ,而其余​​线程调用队列上的take()方法并处理队列中的对象。

尽管小型和监督测试似乎运行正常,但我不确定如何处理特殊情况,例如开头(队列还没有项目),结束(队列清空)以及任何最终的InterruptedExceptions 。 我在这里做了一些阅读,然后让我读到 Goetz和Kabutz的两篇非常好的文章。 共识似乎是不应忽视这些例外。 但是我不确定提供的示例是如何与我的情况相关的,我没有在我的代码中的任何地方调用thread.interrupt() …说到这,我不确定我是否应该这样做…

总结一下,鉴于下面的代码,我如何最好地处理特殊情况,例如终止条件和InterrruptedExceptions? 希望问题有意义,否则我会尽力进一步描述。

提前致谢,


编辑:我已经在实施了一段时间了,我遇到了一个新的打嗝,所以我想我会更新情况。 我遇到了遇到ConcurrentModificationException的不幸,这很可能是由于线程池的不完全关闭/终止造成的。 一旦我发现我可以使用isTerminated()我试过,然后由于一个不同步的wait()我得到一个IllegalMonitorStateException 。 代码的当前状态如下:

我已经听过@ Jonathan的回答中的一些建议,但我不认为他的建议与我需要/想要的一样。 背景故事与我上面提到的相同,相关的代码如下:

持有/管理池的类,以及提交runnables:

 public void serve() { try { this.started = true; pool.execute(new QueryingAction(pcqs)); for(;;){ PathwayImpl p = bq.take(); if (p.getId().equals("0")){ System.out.println("--DEBUG: Termination criteria found, shutdown initiated.."); pool.shutdown(); // give 3 minutes per item in queue to finish up pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES); break; } int sortMethod = AnalysisParameters.getInstance().getSort_method(); pool.submit(new AnalysisAction(p)); } } catch (Exception ex) { ex.printStackTrace(); System.err.println("Unexpected error in core analysis, terminating execution!"); System.exit(0); }finally{ pool.shutdown(); } } public boolean isDone(){ if(this.started) return pool.isTerminated(); else return false; } 

元素通过位于单独类中的以下代码添加到队列中:

 this.queue.offer(path, offer_wait, TimeUnit.MINUTES); 

offer()而不是take()背后的动机正如乔纳森所提到的那样。 不可预见的街区很烦人,很难弄明白,因为我的分析花了很长时间。 因此,如果由于坏块而失败,或者它只是处理数字,我需要相对快速地知道…


最后; 这是我的测试类中的代码,我在其中检查“并发服务”(此处命名为cs)与要分析的其余对象之间的交互:

 cs.serve(); synchronized (this) { while(!cs.isDone()) this.wait(5000); } ReportGenerator rg = new ReportGenerator(); rg.doReports(); 

我意识到这是一个非常长的问题,但我试图详细而具体。 希望它不会太拖累,我道歉,以防万一…

而不是使用take ,哪些块,使用更像这样的东西:

 PathwayImpl p = null; synchronized (bq) { try { while (bq.isEmpty() && !stopSignal) { bq.wait(3000); // Wait up to 3 seconds and check again } if (!stopSignal) { p = bq.poll(); } } catch (InterruptedException ie) { // Broke us out of waiting, loop around to test the stopSignal again } } 

这假设块被包含在某种while (!stopSignal) {...}

然后,在添加到队列的代码中,执行以下操作:

 synchronized (bq) { bq.add(item); bq.notify(); } 

至于InterruptedException ,它们很适合用信号通知线程立即测试停止信号,而不是等到下一次超时和测试。 我建议再次测试你的停止信号,并可能记录exception。

我在发出恐慌信号时使用它们,而不是正常关机,但很少需要这种情况。