为什么并行流不使用ForkJoinPool的所有线程?

所以我知道如果你在没有自定义ForkJoinPool的情况下使用parallelStream ,它将使用默认的ForkJoinPool,默认情况下,只有少一个线程,因为你有处理器。

因此,如此处所述 (以及该问题的另一个答案)为了获得更多的并行性,您必须:

将并行流执行提交给您自己的ForkJoinPool:yourFJP.submit(() – > stream.parallel()。forEach(doSomething));

所以,我这样做了:

 import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream; import com.google.common.collect.Sets; public class Main { public static void main(String[] args) throws InterruptedException, ExecutionException { ForkJoinPool forkJoinPool = new ForkJoinPool(1000); IntStream stream = IntStream.range(0, 999999); final Set thNames = Collections.synchronizedSet(new HashSet()); forkJoinPool.submit(() -> { stream.parallel().forEach(n -> { System.out.println("Processing n: " + n); try { Thread.sleep(500); thNames.add(Thread.currentThread().getName()); System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount()); } catch (Exception e) { throw new RuntimeException(e); } }); }).get(); } } 

我创建了一组线程名称,以便查看正在创建的线程数,并记录了池所具有的活动线程数,并且这两个数字的长度都不超过16,这意味着这里的并行性是不超过16(为什么甚至16?)。 如果我不使用forkJoinPool,我会得到4作为并行性,这取决于我拥有的处理器数量。

为什么它给我16而不是1000?

更新

最初这个答案是一个精心设计的解释,声称ForkJoinPool应用了背压,甚至没有达到规定的并行度,因为总有空闲的工作人员可以处理流。

那是不对的。

实际答案在原始问题中提供,其被标记为重复 – 使用自定义ForkJoinPool进行流处理不受官方支持,并且在使用forEach ,默认池并行性用于确定流分裂器行为。

下面是一个示例,当手动将任务提交到自定义ForkJoinPool ,池的活动线程数很容易达到其并行度级别:

 for (int i = 0; i < 1_000_000; ++i) { forkJoinPool.submit(() -> { try { Thread.sleep(1); thNames.add(Thread.currentThread().getName()); System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism()); } catch (Exception e) { throw new RuntimeException(e); } }); } 

感谢Stuart Marks指出这一点,并向Sotirios Delimanolis争辩说我原来的答案是错误的:)

在我看来,当你向FJP提交lambda时,lambda将使用公共池而不是FJP。 Sotirios Delimanolis在上面的评论certificate了这一点。 您提交的内容是在您的FJP中运行的任务。

尝试分析此代码以查看实际使用的线程。

不能在FJP中命名线程。