有没有办法强制parallelStream()并行?

如果输入大小太小,则库会自动序列化流中映射的执行 ,但此自动化不会并且不能考虑映射操作的重要程度。 有没有办法强制parallelStream()实际并行化CPU 图?

似乎存在一个根本性的误解。 由于OP没有看到预期的加速,链接的问答讨论了流显然不能并行工作。 结论是,如果工作负载太小,并行处理没有任何好处而不是自动回退到顺序执行。

实际上恰恰相反。 如果您请求并行,即使它实际上降低了性能,您也会得到并行。 在这种情况下,实现不会切换到可能更有效的顺序执行。

因此,如果您确信每个元素的工作负载足以certificate使用并行执行是合理的,无论元素数量少,您都可以简单地请求并行执行。

可以很容易地certificate:

 Stream.of(1, 2).parallel() .peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread())) .forEach(System.out::println); 

在Ideone上 ,它会打印出来

 processing 2 in Thread[main,5,main] 2 processing 1 in Thread[ForkJoinPool.commonPool-worker-1,5,main] 1 

但邮件和详细信息的顺序可能会有所不同。 甚至可能在某些环境中,两个任务都可能碰巧由同一​​个线程执行,如果它可以在另一个线程开始拾取它之前完成第二个任务。 但是,当然,如果任务足够昂贵,这种情​​况就不会发生。 重要的一点是,整体工作负载已被拆分并入队,可能被其他工作线程拾取。

如果您的环境中针对上述简单示例执行了单个线程的执行,则可以插入模拟工作负载,如下所示:

 Stream.of(1, 2).parallel() .peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread())) .map(x -> { LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(3)); return x; }) .forEach(System.out::println); 

然后,如果“ 每个元素的处理时间 ”足够高,您还可能会看到整体执行时间将短于“ 元素数量 ”ד 每个元素处理时间 ”。


更新:误解可能是由于Brian Goetz的误导性陈述:“在您的情况下,您的输入集太小而无法分解”。

必须强调的是,这不是Stream API的一般属性,而是已使用的MapHashMap有一个支持数组,条目在该数组中分布,具体取决于它们的哈希码。 可能的情况是,将数组拆分为n个范围不会导致所包含元素的平衡拆分,尤其是如果只有两个。 HashMapSpliterator的实现者考虑在数组中搜索元素以获得完美平衡的分割太昂贵,而不是分割两个元素是不值得的。

由于HashMap的默认容量为16 ,并且示例只有两个元素,我们可以说地图是超大的。 简单地修复它也会修复这个例子:

 long start = System.nanoTime(); Map> input = new HashMap<>(2); input.put("1", () -> { System.out.println(Thread.currentThread()); LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2)); return "a"; }); input.put("2", () -> { System.out.println(Thread.currentThread()); LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2)); return "b"; }); Map results = input.keySet() .parallelStream().collect(Collectors.toConcurrentMap( key -> key, key -> input.get(key).get())); System.out.println("Time: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime()- start)); 

在我的机器上,它打印

 Thread[main,5,main] Thread[ForkJoinPool.commonPool-worker-1,5,main] Time: 2058 

结论是,无论输入大小如何,Stream实现总是尝试使用并行执行(如果您请求它)。 但这取决于输入的结构 ,工作负载可以分配给工作线程的程度。 事情可能更糟,例如,如果您从文件中流式传输线条。

如果您认为平衡拆分的好处值得复制步骤的成本,您还可以使用new ArrayList<>(input.keySet()).parallelStream()而不是input.keySet().parallelStream() ,因为ArrayList中元素的分布总是允许完全平衡的分割。