如何在ForkJoinPool中阻止队列?
我需要在其队列已满时阻止ForkJoinPool上的线程。 这可以在标准的ThreadPoolExecutor中完成,例如:
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); }
我知道,ForkJoinPool中有一些Dequeue,但是我无法通过它访问它。
更新:请参阅下面的答案。
经过一番研究,我很乐意回答这个问题:
原因:由于以下原因,ForkJoinPool的实现中没有这样的选项。 大多数juc Executors都假设单个并发队列和许multithreading。 这会导致队列争用,并在多个线程读取/写入队列时降低性能。 因此,这种方法不是很可扩展 – >队列上的高争用可以生成大量的上下文切换和CPU业务。
实现:在ForkJoinPool中,每个线程都有一个由数组支持的单独的双端队列( Deque )。 为了最大限度地减少争用, 工作窃取发生在双端队列的尾部,而任务提交发生在当前线程(工作者)的头部。 尾部包含最大部分的工作。 换句话说,通过另一个工作线程从尾部窃取可以最大限度地减少与其他工作人员交互的次数 – >争用更少,整体性能更好。
-
这个想法在Doug Lea的官方白皮书“Java Fork / Join Framework”中有所描述。
-
可伸缩性基准测试显示在“让它崩溃 – Fork加入池的可伸缩性”中
解决方法的想法:有全局提交队列。 来自非FJ线程的提交进入提交队列(工作人员承担这些任务)。 还有上面提到的工人队列。
队列的最大大小受数量限制:
/** * Maximum size for queue arrays. Must be a power of two less * than or equal to 1 << (31 - width of array entry) to ensure * lack of wraparound of index calculations, but defined to a * value a bit less than this to help users trap runaway * programs before saturating systems. */ static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
当队列已满时,将抛出未经检查的exception:
RejectedExecutionException("Queue capacity exceeded")
这在javadocs中有描述。
(另请参阅ThreadPool的UncaughtExceptionHandler
构造函数)
我倾向于声称当前的实现没有这样的机制,这应该由我们在消费API中实现。
例如,这可以按如下方式完成:
- 实现指数后退逻辑,尝试通过增加下一次重试的时间间隔来定期重新提交任务。 要么..
- 编写一个定期检查submissionQueue大小的
ForkJoinPool.getQueuedSubmissionCount()
参见ForkJoinPool.getQueuedSubmissionCount()
)。
以下是ForkJoinPool的官方JSR-166E java代码以获取更多信息。