有界PriorityBlockingQueue

PriorityBlockingQueue是无限的,但我需要以某种方式绑定它。 实现这一目标的最佳方法是什么?

有关信息,有界PriorityBlockingQueue将用于ThreadPoolExecutor

注意:通过有界我不想抛出exception,如果发生这种情况,我想将对象放入队列中,然后根据其优先级值进行剪切。 有什么好方法可以做这件事吗?

我实际上不会将它子类化。 虽然我现在无法将示例代码放在一起,但我建议使用装饰模式的一个版本。

创建一个新类并实现您感兴趣的类实现的接口: PriorityBlockingQueue 。 我发现这个类使用了以下接口:

 Serializable, Iterable, Collection, BlockingQueue, Queue 

在类的构造函数中,接受PriorityBlockingQueue作为构造函数参数。

然后通过PriorityblockingQueue的实例实现接口所需的所有方法。 添加使其成为有界所需的任何代码。

这是我在Violet UML中放在一起的快速图表:

BoundedPriorityblockingQueue类图http://theopensourceu.com/wp-content/uploads/2010/03/BoundedPriorityBlockingQueue.png

在Google Collections / Guava库中有一个实现: MinMaxPriorityQueue 。

可以使用最大大小配置最小 – 最大优先级队列。 如果是这样,每次队列的大小超过该值时,队列将根据其比较器(可能是刚刚添加的元素)自动删除其最大元素。 这与传统的有界队列不同,传统的有界队列在满时阻止或拒绝新元素。

在我的头脑中,我将它子类化并覆盖put方法来强制执行此操作。 如果它结束抛出exception或做任何看似合适的事情。

就像是:

 public class LimitedPBQ extends PriorityBlockingQueue { private int maxItems; public LimitedPBQ(int maxItems){ this.maxItems = maxItems; } @Override public boolean offer(Object e) { boolean success = super.offer(e); if(!success){ return false; } else if (this.size()>maxItems){ // Need to drop last item in queue // The array is not guaranteed to be in order, // so you should sort it to be sure, even though Sun's Java 6 // version will return it in order this.remove(this.toArray()[this.size()-1]); } return true; } } 

编辑:添加和添加调用提供,所以覆盖它应该是足够的

编辑2:如果超过maxItems,现在应该删除最后一个元素。 尽管如此,可能还有更优雅的方式。

根据Frank V的建议实现BoundedPriorityBlockingQueue之后,我意识到它并没有完全符合我的要求。 主要问题是我必须插入队列的项目可能比队列中已有的项目具有更高的优先级。 因此我真正想要的是一个’pivot’方法,如果我将一个对象放入队列,当队列已满时,我想要找回最低优先级的对象,而不是阻塞。

为了充实弗兰克五世的建议我使用了以下片段……

 public class BoundedPriorityBlockingQueue implements Serializable, Iterable, Collection, BlockingQueue, Queue, InstrumentedQueue { 

…私人最终的ReentrantLock锁; // = new ReentrantLock(); 私人最终条件不满;

 final private int capacity; final private PriorityBlockingQueue queue; public BoundedPriorityBlockingQueue(int capacity) throws IllegalArgumentException, NoSuchFieldException, IllegalAccessException { if (capacity < 1) throw new IllegalArgumentException("capacity must be greater than zero"); this.capacity = capacity; this.queue = new PriorityBlockingQueue(); // gaining access to private field Field reqField; try { reqField = PriorityBlockingQueue.class.getDeclaredField("lock"); reqField.setAccessible(true); this.lock = (ReentrantLock)reqField.get(ReentrantLock.class); this.notFull = this.lock.newCondition(); } catch (SecurityException ex) { ex.printStackTrace(); throw ex; } catch (NoSuchFieldException ex) { ex.printStackTrace(); throw ex; } catch (IllegalAccessException ex) { ex.printStackTrace(); throw ex; } ... @Override public boolean offer(E e) { this.lock.lock(); try { while (this.size() == this.capacity) notFull.await(); boolean success = this.queue.offer(e); return success; } catch (InterruptedException ie) { notFull.signal(); // propagate to a non-interrupted thread return false; } finally { this.lock.unlock(); } } ... 

这也有一些检测,所以我可以检查队列的有效性。 我仍在研究’PivotPriorityBlockingQueue’,如果有人有兴趣我可以发布它。

如果你想要执行的Runnables的顺序不严格(原样:即使存在更高优先级的任务,也可能会执行一些优先级较低的任务),那么我会建议以下内容,归结为定期删除PriorityQueue缩小尺寸:

 if (queue.size() > MIN_RETAIN * 2){ ArrayList toRetain = new ArrayList(MIN_RETAIN); queue.drainTo(toRetain, MIN_RETAIN); queue.clear(); for (T t : toRetain){ queue.offer(t); } } 

如果订单需要严格,这显然会失败,因为耗尽将导致片刻,wenn低优先级任务将使用并发访问从队列中检索。

优点是,这是线程安全的,并且可能与优先级队列设计一样快。

到目前为止,没有一个答案具有以下所有属性:

  • 实现BlockingQueue接口。
  • 支持删除绝对最大值。
  • 没有比赛条件。

不幸的是,标准Java库中没有BlockingQueue实现。 您将需要找到一个实现或自己实现一些东西。 实现BlockingQueue需要一些正确锁定的知识。

以下是我的建议:看一下https://gist.github.com/JensRantil/30f812dd237039257a3d并将其用作模板,在SortedSet周围实现自己的包装器。 基本上,所有锁定都存在,并且有多个unit testing(需要一些调整)。

ConcurrencyUtils repo中有一个实现。

查看Google Collections API中的ForwardingQueue 。 对于阻塞语义,您可以使用信号量 。

这里有另一个实现

它似乎做你要求的:

BoundedPriorityQueue实现了一个优先级队列,其中包含元素数量的上限。 如果队列未满,则始终添加添加的元素。 如果队列已满并且添加的元素大于队列中的最小元素,则删除最小元素并添加新元素。 如果队列已满并且添加的元素不大于队列中的最小元素,则不会添加新元素。