“关闭”阻塞队列
我在一个非常简单的生产者 – 消费者场景中使用java.util.concurrent.BlockingQueue 。 例如,这个伪代码描述了消费者部分:
class QueueConsumer implements Runnable { @Override public void run() { while(true) { try { ComplexObject complexObject = myBlockingQueue.take(); //do something with the complex object } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
到现在为止还挺好。 在阻塞队列的javadoc中,我读到:
BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示不再添加任何项目。 这些function的需求和使用倾向于依赖于实现。 例如,一种常见的策略是生产者插入特殊的流末端或毒物对象,这些对象在被消费者采用时会相应地进行解释。
不幸的是,由于使用的generics和ComplexObject的性质,将“毒物对象”推入队列并非易事。 所以这种“常用策略”在我的场景中并不是很方便。
我的问题是:我可以用什么其他好的策略/模式来“关闭”队列?
谢谢!
如果你有一个消费者线程的句柄,你可以打断它。 使用您提供的代码,这将杀死消费者。 我不希望制片人有这个; 它可能不得不以某种方式回调程序控制器,让它知道它已经完成。 然后控制器将中断消费者线程。
在遵守中断之前,您总是可以完成工作。 例如:
class QueueConsumer implements Runnable { @Override public void run() { while(!(Thread.currentThread().isInterrupted())) { try { final ComplexObject complexObject = myBlockingQueue.take(); this.process(complexObject); } catch (InterruptedException e) { // Set interrupted flag. Thread.currentThread().interrupt(); } } // Thread is getting ready to die, but first, // drain remaining elements on the queue and process them. final LinkedList remainingObjects; myBlockingQueue.drainTo(remainingObjects); for(ComplexObject complexObject : remainingObjects) { this.process(complexObject); } } private void process(final ComplexObject complexObject) { // Do something with the complex object. } }
我实际上更喜欢以某种方式毒害队列。 如果你想杀死线程,请让线程自杀。
(很高兴看到有人正确处理InterruptedException
。)
这里似乎存在一些关于处理中断的争论。 首先,我希望大家阅读这篇文章: http : //www.ibm.com/developerworks/java/library/j-jtp05236.html
现在,在理解没有人真正阅读过的情况下,这就是交易。 如果一个线程在InterruptedException
时当前阻塞,它将只接收InterruptedException
。 在这种情况下, Thread.interrupted()
将返回false
。 如果它没有阻塞,它将不会收到此exception,而Thread.interrupted()
将返回true
。 因此,无论如何,您的循环保护应该绝对检查Thread.interrupted()
,否则可能会错过线程的中断。
所以,既然你正在检查Thread.interrupted()
无论如何,你被迫捕获InterruptedException
(即使你没有被强迫也应该处理它),你现在有两个代码区来处理同一个事件,线程中断。 处理此问题的一种方法是将它们规范化为一个条件,这意味着布尔状态检查可以抛出exception,或者exception可以设置布尔状态。 我选择了以后的。
编辑:注意静态Thread#interrupted方法清除当前线程的中断状态。
另一种方法是使用ExecutorService
包装您正在进行的处理,并让ExecutorService
本身控制是否将作业添加到队列中。
基本上,您可以利用ExecutorService.shutdown()
,当被调用时,不允许执行程序处理任何其他任务。
我不确定你当前是如何向你的例子中的QueueConsumer
提交任务的。 我假设您有某种submit()
方法,并在示例中使用了类似的方法。
import java.util.concurrent.*; class QueueConsumer { private final ExecutorService executor = Executors.newSingleThreadExecutor(); public void shutdown() { executor.shutdown(); // gracefully shuts down the executor } // 'Result' is a class you'll have to write yourself, if you want. // If you don't need to provide a result, you can just Runnable // instead of Callable. public Future submit(final ComplexObject complexObject) { if(executor.isShutdown()) { // handle submitted tasks after the executor has been told to shutdown } return executor.submit(new Callable () { @Override public Result call() { return process(complexObject); } }); } private Result process(final ComplexObject complexObject) { // Do something with the complex object. } }
这个例子只是java.util.concurrent
包提供的一个袖口的例证; 可能会对它进行一些优化(例如, QueueConsumer
可能甚至不需要它自己的类;你可以只为任何生产者提交任务提供ExecutorService
)。
挖掘java.util.concurrent
包(从上面的一些链接开始)。 您可能会发现它为您尝试执行的操作提供了很多很好的选择,您甚至不必担心调整工作队列。
使这个简单的另一个想法:
class ComplexObject implements QueueableComplexObject { /* the meat of your complex object is here as before, just need to * add the following line and the "implements" clause above */ @Override public ComplexObject asComplexObject() { return this; } } enum NullComplexObject implements QueueableComplexObject { INSTANCE; @Override public ComplexObject asComplexObject() { return null; } } interface QueueableComplexObject { public ComplexObject asComplexObject(); }
然后使用BlockingQueue
作为队列。 当您希望结束队列的处理时,请执行queue.offer(NullComplexObject.INSTANCE)
。 在消费者方面,做
boolean ok = true; while (ok) { ComplexObject obj = queue.take().asComplexObject(); if (obj == null) ok = false; else process(obj); } /* interrupt handling elided: implement this as you see fit, * depending on whether you watch to swallow interrupts or propagate them * as in your original post */
没有必需的instanceof
,并且您不必构造伪的ComplexObject
,这可能是昂贵/困难的,具体取决于它的实现。
制作毒物对象的另一种可能性:使其成为该类的特定实例。 通过这种方式,您不必捣乱子类型或搞砸通用。
缺点:如果生产者和消费者之间存在某种序列化障碍,这将无效。
public class ComplexObject { public static final POISON_INSTANCE = new ComplexObject(); public ComplexObject(whatever arguments) { } // Empty constructor for creating poison instance. private ComplexObject() { } } class QueueConsumer implements Runnable { @Override public void run() { while(!(Thread.currentThread().interrupted())) { try { final ComplexObject complexObject = myBlockingQueue.take(); if (complexObject == ComplexObject.POISON_INSTANCE) return; // Process complex object. } catch (InterruptedException e) { // Set interrupted flag. Thread.currentThread().interrupt(); } } } }
是否可以扩展ComplexObject并模拟非平凡的创建function? 基本上你最终会得到一个shell对象,但你可以做一个instance of
来查看是否是队列对象的结束。
您可以将通用对象包装到数据对象中。 在此数据对象上,您可以添加其他数据,例如毒物对象状态。 dataobject是一个包含2个字段的类。 T complexObject;
和boolean poison;
。
您的使用者从队列中获取数据对象。 如果返回有毒对象,则关闭使用者,否则打开generics并调用’process(complexObject)’。
我正在使用java.util.concurrent.LinkedBlockingDeque
以便您可以在队列末尾添加对象并从前面获取它们。 这样你的对象将按顺序处理,但更重要的是,在遇到毒物对象后关闭队列是安全的。
为了支持多个使用者,我在遇到它时将毒物对象添加回队列。
public final class Data { private boolean poison = false; private T complexObject; public Data() { this.poison = true; } public Data(T complexObject) { this.complexObject = complexObject; } public boolean isPoison() { return poison; } public T getComplexObject() { return complexObject; } }
public class Consumer implements Runnable { @Override public final void run() { Data data; try { while (!(data = queue.takeFirst()).isPoison()) { process(data.getComplexObject()); } } catch (final InterruptedException e) { Thread.currentThread().interrupt(); return; } // add the poison object back so other consumers can stop too. queue.addLast(line); } }
在这种情况下,您通常必须抛弃generics并使队列保持类型为Object。 那么,你只需要在转换为实际类型之前检查你的“毒药”对象。
对我来说,实现一个BlockingQueue
似乎是合理的:
import java.util.concurrent.BlockingQueue; public interface CloseableBlockingQueue extends BlockingQueue { /** Returns true if this queue is closed, false otherwise. */ public boolean isClosed(); /** Closes this queue; elements cannot be added to a closed queue. **/ public void close(); }
使用以下行为实现此function非常简单(参见方法摘要表 ):
-
插入 :
-
抛出exception , 特殊值 :
行为像一个完整的
Queue
,调用者负责测试isClosed()
。 -
块 :
如果和何时关闭,则抛出
IllegalStateException
。 -
超时 :
如果和关闭时返回
false
,调用者负责测试isClosed()
。
-
-
删除 :
-
抛出exception , 特殊值 :
表现得像一个空的
Queue
,调用者负责测试isClosed()
。 -
块 :
如果和关闭时抛出
NoSuchElementException
。 -
超时 :
如果和关闭时返回
null
,调用者负责测试isClosed()
。
-
-
检查
没变。
我是通过编辑源代码完成此操作,在github.com上找到它。
我用过这个系统:
ConsumerClass private boolean queueIsNotEmpty = true;//with setter ... do { ... sharedQueue.drainTo(docs); ... } while (queueIsNotEmpty || sharedQueue.isEmpty());
当生产者完成时,我将consumerObject,queueIsNotEmpty字段设置为false
今天我使用包装器对象解决了这个问题。 由于ComplexObject对于子类太复杂,我将ComplexObject包装到ComplexObjectWrapper对象中。 然后使用ComplexObjectWrapper作为generics类型。
public class ComplexObjectWrapper { ComplexObject obj; } public class EndOfQueue extends ComplexObjectWrapper{}
现在代替BlockingQueue
我做了BlockingQueue
由于我控制了消费者和生产者,这个解决方案对我有用。