Tag: fork join

如何在ForkJoinPool中阻止队列?

我需要在其队列已满时阻止ForkJoinPool上的线程。 这可以在标准的ThreadPoolExecutor中完成,例如: private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); } 我知道,ForkJoinPool中有一些Dequeue,但是我无法通过它访问它。 更新:请参阅下面的答案。

scala.concurrent.forkjoin.ForkJoinPool vs java.util.concurrent.ForkJoinPool

为什么ForkJoinPool是为Scala分叉的? 哪种实施方式以及哪种情况首选?

如何在Java Fork / Join框架中显示工作窃取?

我想改进我的fork / join小例子,以表明在Java Fork / Join框架执行工作中发生窃取。 我需要对代码进行哪些更改? 示例目的:只需对多个线程之间的值分解进行线性研究。 package com.stackoverflow.questions; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class CounterFJ<T extends Comparable> extends RecursiveTask { private static final long serialVersionUID = 5075739389907066763L; private List _list; private T _test; private int _lastCount = -1; private int _start; private int _end; private int _divideFactor […]

Java Fork-Join(Java 8)中出现意外的可伸缩性

最近,我使用Java Fork-Join运行了一些可扩展性实验。 在这里,我使用了非默认的ForkJoinPool构造函数ForkJoinPool(int parallelism) ,将所需的并行性(#workers)作为构造函数参数传递。 具体来说,使用以下代码: public static void main(String[] args) throws InterruptedException { ForkJoinPool pool = new ForkJoinPool(Integer.parseInt(args[0])); pool.invoke(new ParallelLoopTask()); } static class ParallelLoopTask extends RecursiveAction { final int n = 1000; @Override protected void compute() { RecursiveAction[] T = new RecursiveAction[n]; for(int p = 0; p < n; p++){ T[p] = new DummyTask(); […]

Java ForkJoinPool具有非递归任务,是否可以正常工作?

我想通过一种方法将Runnable任务提交到ForkJoinPool: forkJoinPool.submit(Runnable task) 注意,我使用的是JDK 7。 在引擎盖下,它们被转换为ForkJoinTask对象。 我知道当一个任务以递归方式分成较小的任务时,ForkJoinPool是有效的。 题: 如果没有递归,窃取工作仍然可以在ForkJoinPool中工作吗? 在这种情况下值得吗? 更新1:任务很小,可能不平衡。 即使对于严格相同的任务,诸如上下文切换,线程调度,停车,页面未命中等事情也会导致导致不平衡 。 更新2: Doug Lea在并发JSR-166兴趣小组中写道,给出了一个暗示: 当所有任务都是异步并提交到池而不是分叉时,这也极大地提高了吞吐量,这成为构造actor框架的合理方法,以及许多您可能使用ThreadPoolExecutor的普通服务。 我认为,当涉及相当小的CPU绑定任务时,由于这种优化,ForkJoinPool是可行的方法。 重点是这些任务已经很小,不需要递归分解。 无论是大工作还是小任务, 工作窃取工作都可以被来自忙碌工人的Deque尾巴的另一个自由工作者抓住。 更新3: ForkJoinPool的可扩展性 – Akka乒乓球队的基准测试显示了很好的结果。 尽管如此,要更有效地应用ForkJoinPool还需要进行性能调整。

Java的fork-and-join线程池是否适合执行IO绑定任务?

在我的应用程序中,我必须通过执行许多network-io绑定任务来解决问题,有时一个io绑定任务并分成更小的io绑定任务。 这些任务目前正在使用Java的标准线程池机制执行。 我想知道我是否可以转向fork-and-join框架? 但问题是,forkandjoin框架通常用于解决io绑定操作或CPU绑定吗? 我假设它们主要用于CPU绑定操作,因为fork-and-join框架利用工作窃取技术来利用多核处理器,但如果我将它用于IO绑定任务,会不会有任何不利影响?

Java 7:Fork / Join框架

有人能解释一下Fork / Join是什么吗?

Fork-Join相关:join()vs get()vs invoke()

是否有必要使用join()和fork()或者我也可以使用join() , get() , invoke() 。 我检查了API ,除了get()抛出InterruptedException和ExecutionException我没有看到差异……而invoke()似乎完全相同。 但是我总是看到与join()相关的fork()而不是其他两种方法……它们不提供并行性吗? 将invoke()和join()完全相同的目的是什么? 通过实现future,我可以理解get(),但是invoke()和join()呢。 提前致谢。 编辑 :我在API中的不好实际引用了它,因为已经收到的答案指出了它。 但他们的意思是: 方法invoke()在语义上等同于fork(); join()但总是尝试在当前线程中开始执行 提前致谢。

ForkJoinPool在invokeAll / join期间停止

我尝试使用ForkJoinPool来并行化我的CPU密集型计算。 我对ForkJoinPool的理解是,只要任何任务可以执行,它就会继续工作。 不幸的是,我经常观察到工作线程空闲/等待,因此并非所有CPU都保持忙碌状态。 有时我甚至观察到额外的工作线程。 我没想到这一点,因为我严格尝试使用非阻塞任务。 我的观察非常类似于ForkJoinPool似乎浪费了一个线程 。 在调试了很多ForkJoinPool之后,我有一个猜测: 我使用invokeAll()在子任务列表上分配工作。 在invokeAll()完成后执行第一个任务本身,它开始加入其他任务。 这样可以正常工作,直到下一个要连接的任务位于执行队列之上。 不幸的是,我提交了异步的其他任务而没有加入它们。 我期望ForkJoin框架首先继续执行这些任务,然后再转回加入任何剩余的任务。 但它似乎不是这样工作的。 相反,工作线程停止调用wait()直到等待的任务准备好(可能由其他工作线程执行)。 我没有validation这一点,但似乎是调用join()的一般缺陷。 ForkJoinPool提供了一个asyncMode ,但这是一个全局参数,不能用于单独的提交。 但我喜欢看到我的异步分叉任务很快就会执行。 那么,为什么ForkJoinTask.doJoin()不是简单地在其队列之上执行任何可用任务,直到它准备好(由自己执行或被其他人窃取)?

为什么并行性ForkJoinPool加倍我的exception?

假设我有如下代码: Future executeBy(ExecutorService executor) { return executor.submit(() -> { throw new IllegalStateException(); }); } 使用ForkJoinPool#commonPool时没有问题,但是当我使用并行性ForkJoinPool它会使IllegalStateException加倍。 例如: executeBy(new ForkJoinPool(1)).get(); // ^— double the IllegalStateException Q1 :为什么ForkJoinPool将Exception发生在Callable ? Q2 :如何避免这种奇怪的行为?