具有有界队列的线程池

我已经看到了线程池执行器实现和它提供的被拒绝的执行策略。 但是,我有一个自定义要求 – 我希望有一个回调机制,在达到队列大小限制时我会收到通知,并说当队列大小减少到最大允许队列大小的80%时。

public interface ISaturatedPoolObserver { void onSaturated(); // called when the blocking queue reaches the size limit void onUnsaturated(); // called when blocking queues size goes below the threshold. } 

我觉得这可以通过子类化线程池执行器来实现,但是是否已经实现了版本? 我很乐意在需要时提供更多细节和我的工作以提供清晰度。

我希望有一个回调机制,当达到队列大小限制时,我会收到通知…

我不会inheritance执行程序,但我会inheritance执行程序使用的BlockingQueue 。 像下面这样的东西应该工作。 如果删除一个条目并且有人将一个条目放入,则checkUnsaturated()周围的代码中存在竞争条件。如果这些条件需要完美,则可能必须在队列上进行同步。 此外,我不知道执行程序实现使用什么方法,因此您可能不需要覆盖其中的一些方法。

 public class ObservableBlockingQueue extends LinkedBlockingQueue { private ISaturatedPoolObserver observer; private int capacity; public ObservableBlockingQueue(ISaturatedPoolObserver observer, int capacity) { super(capacity); this.observer = observer; this.capacity = capacity; } @Override public boolean offer(E o) { boolean offered = super.offer(o); if (!offered) { observer.onSaturated(); } return offered; } @Override public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { boolean offered = super.offer(o, timeout, unit); if (!offered) { observer.onSaturated(); } return offered; } @Override public E poll() { E e = super.poll(); if (e != null) { checkUnsaturated(); } return e; } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = super.poll(timeout, unit); if (e != null) { checkUnsaturated(); } return e; } @Override public E take() throws InterruptedException { E e = super.take(); checkUnsaturated(); return e; } @Override public boolean remove(E e) throws InterruptedException { boolean removed = super.remove(e); if (removed) { checkUnsaturated(); } return removed; } private void checkUnsaturated() { if (super.size() * 100 / capacity < UNSATURATED_PERCENTAGE) { observer.onUnsaturated(); } } } 

所以这里是基于上面答案的代码。 在线程池的工作队列的持续加载期间需要调用饱和和未饱和的调用,并且我相信实现通过使用非阻塞算法来实现它。

此外,此实现可用于阻塞队列的任何实现(原始队列也可以是有界的或无界的)。

我正在使用guava的ForwardingBlockingQueue来编写我的装饰器。 任何建议将不胜感激。

 import java.util.Collection; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.util.concurrent.ForwardingBlockingQueue; /** * @version $Id$ * @param  the type of elements held in this blocking queue. */ public class BoundObservableBlockingQueue extends ForwardingBlockingQueue { /** observer to receive callbacks. */ private final ISaturatedQueueObserver queueBoundObserver; /** original blocking queue being decorated. */ private final BlockingQueue queueDelegate; /** user specified blocking queue bound capacity. */ private final int boundCapacity; /** user specified blocking queue bound capacity. */ private final int boundThreshold; /** flag to represent the saturated state of the queue. */ private final AtomicBoolean isSaturated = new AtomicBoolean(false); /** * * @param pQueue {@link BlockingQueue * @param pQueueBoundObserver {@link ISaturatedQueueObserver} * @param pBoundCapacity saturation capacity for the bound queue. */ public BoundObservableBlockingQueue(final BlockingQueue pQueue, final ISaturatedQueueObserver pQueueBoundObserver, final int pBoundCapacity) { queueDelegate = pQueue; queueBoundObserver = pQueueBoundObserver; boundCapacity = pBoundCapacity; boundThreshold = (int) 0.8 * pBoundCapacity; } /** {@inheritDoc} */ @Override public final boolean offer(final E e) { boolean isOffered = delegate().offer(e); checkSaturated(); return isOffered; } /** {@inheritDoc} */ @Override public final boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { boolean isOffered = delegate().offer(e, timeout, unit); checkSaturated(); return isOffered; } /** {@inheritDoc} */ @Override public final E remove() { E element = delegate().remove(); checkUnsaturated(); return element; } /** {@inheritDoc} */ @Override public final E poll() { E element = delegate().poll(); checkUnsaturated(); return element; } /** {@inheritDoc} */ @Override public final E poll(final long timeout, final TimeUnit unit) throws InterruptedException { E element = delegate().poll(timeout, unit); checkUnsaturated(); return element; } /** {@inheritDoc} */ @Override public final E take() throws InterruptedException { E element = delegate().take(); checkUnsaturated(); return element; } /** {@inheritDoc} */ @Override public final boolean remove(final Object o) { boolean isRemoved = delegate().remove(o); checkUnsaturated(); return isRemoved; } /** {@inheritDoc} */ @Override protected final BlockingQueue delegate() { return queueDelegate; } // thread pool uses this only during invocation of shutdown; in which cases call to unSaturated isn't needed because // the queue is no longer ready to accept any more records. /** {@inheritDoc} */ @Override public final int drainTo(final Collection c) { return delegate().drainTo(c); } private void checkUnsaturated() { if (delegate().size() < boundThreshold && isSaturated.get()) { if (isSaturated.compareAndSet(true, false)) { queueBoundObserver.onUnsaturated(); } } } private void checkSaturated() { if ((delegate().size() >= boundCapacity) && !isSaturated.get()) { if (isSaturated.compareAndSet(false, true)) { queueBoundObserver.onSaturated(); } } } }