Java 8 Stream api:为什么区分顺序和并行执行模式?

来自Stream javadoc :

流管道可以顺序执行或并行执行。 此执行模式是流的属性。 通过初始选择的顺序或并行执行来创建流。

我的假设:

  1. 顺序/并行流之间没有function差异。 输出永远不会受执行模式的影响。
  2. 由于性能的提高,给定适当数量的内核和问题大小以certificate开销是合并的并行流总是优选的。
  3. 我们想编写一次代码并在任何地方运行而不必关心硬件(毕竟这是Java)。

假设这些假设是有效的(一些元假设没有错), 在api中暴露执行模式的价值是什么?

看起来你应该只能声明一个Stream ,并且顺序/并行执行的选择应该在下面的层中自动处理,可以通过库代码或JVM本身作为运行时可用内核的函数来处理,大小问题等

当然,假设并行流也可以在单个核心机器上运行,也许只是总是使用并行流来实现这一点。 但这真的很难看 – 为什么我的代码中的并行流显式引用它是默认选项?

即使存在您故意想要对顺序流进行硬编码的情况 – 为什么不仅仅为此目的使用子接口SequentialStream ,而不是使用执行模式切换来污染Stream

看起来你应该只能声明一个Stream,并且顺序/并行执行的选择应该在下面的层中自动处理,可以通过库代码或JVM本身作为运行时可用内核的函数来处理,大小问题等

现实情况是,a)流是一个库,并没有特殊的JVM魔法,并且b)你无法真正设计一个足够智能的库来自动确定在这种特殊情况下正确的决定是什么。 没有合理的方法来估计特定function在没有运行的情况下会花费多少 – 即使你可以反省它的实现,你也不能 – 现在你要在每个流操作中引入一个基准测试,试图弄清楚如果并行化将是值得并行开销的成本。 这是不切实际的,特别是考虑到你事先并不知道并行性开销有多糟糕。

由于性能的提高,给定适当数量的内核和问题大小以certificate开销是合并的并行流总是优选的。

在实践中并非总是如此。 有些任务非常小,以至于它们不值得并行化,而且并行性总是会产生一些开销。 (坦率地说,大多数程序员倾向于高估并行性的有用性,在它真正损害性能的时候把它打到各处。)

基本上,这是一个很难的问题,你基本上必须把它推到程序员身上。

在这个问题中有一个有趣的案例表明,有时并行流可能在数量级上较慢。 在该特定示例中,并行版本运行十分钟,而顺序版本运行几秒钟。

顺序/并行流之间没有function差异。 输出永远不会受执行模式的影响。

顺序/并行流执行之间存在差异。 在下面的代码中, TEST_2结果显示并行线程执行比顺序方式快得多。

由于性能的提高,给定适当数量的内核和问题大小以certificate开销是合并的并行流总是优选的。

并不是的。 如果任务不值得(简单任务)在并行线程中执行,那么我们只是在增加代码的开销。 TEST_1结果显示了这一点。 另请注意,如果所有工作线程都忙于一个并行执行任务; 然后代码中其他地方的其他并行流操作将等待它。

我们想编写一次代码并在任何地方运行而不必关心硬件(毕竟这是Java)。

因为只有程序员知道; 是否值得并行/顺序执行此任务,而不管CPU是什么。 所以java API向开发人员公开了这两个选项。

 import java.util.ArrayList; import java.util.List; /* * Performance test over internal(parallel/sequential) and external iterations. * https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html * * * Parallel computing involves dividing a problem into subproblems, * solving those problems simultaneously (in parallel, with each subproblem running in a separate thread), * and then combining the results of the solutions to the subproblems. Java SE provides the fork/join framework, * which enables you to more easily implement parallel computing in your applications. However, with this framework, * you must specify how the problems are subdivided (partitioned). * With aggregate operations, the Java runtime performs this partitioning and combining of solutions for you. * * Limit the parallelism that the ForkJoinPool offers you. You can do it yourself by supplying the -Djava.util.concurrent.ForkJoinPool.common.parallelism=1, * so that the pool size is limited to one and no gain from parallelization * * @see ForkJoinPool * https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html * * ForkJoinPool, that pool creates a fixed number of threads (default: number of cores) and * will never create more threads (unless the application indicates a need for those by using managedBlock). * * http://stackoverflow.com/questions/10797568/what-determines-the-number-of-threads-a-java-forkjoinpool-creates * */ public class IterationThroughStream { private static boolean found = false; private static List smallListOfNumbers = null; public static void main(String[] args) throws InterruptedException { // TEST_1 List bigListOfStrings = new ArrayList(); for(Long i = 1l; i <= 1000000l; i++) { bigListOfStrings.add("Counter no: "+ i); } System.out.println("Test Start"); System.out.println("-----------"); long startExternalIteration = System.currentTimeMillis(); externalIteration(bigListOfStrings); long endExternalIteration = System.currentTimeMillis(); System.out.println("Time taken for externalIteration(bigListOfStrings) is :" + (endExternalIteration - startExternalIteration) + " , and the result found: "+ found); long startInternalIteration = System.currentTimeMillis(); internalIteration(bigListOfStrings); long endInternalIteration = System.currentTimeMillis(); System.out.println("Time taken for internalIteration(bigListOfStrings) is :" + (endInternalIteration - startInternalIteration) + " , and the result found: "+ found); // TEST_2 smallListOfNumbers = new ArrayList(); for(int i = 1; i <= 10; i++) { smallListOfNumbers.add(i); } long startExternalIteration1 = System.currentTimeMillis(); externalIterationOnSleep(smallListOfNumbers); long endExternalIteration1 = System.currentTimeMillis(); System.out.println("Time taken for externalIterationOnSleep(smallListOfNumbers) is :" + (endExternalIteration1 - startExternalIteration1)); long startInternalIteration1 = System.currentTimeMillis(); internalIterationOnSleep(smallListOfNumbers); long endInternalIteration1 = System.currentTimeMillis(); System.out.println("Time taken for internalIterationOnSleep(smallListOfNumbers) is :" + (endInternalIteration1 - startInternalIteration1)); // TEST_3 Thread t1 = new Thread(IterationThroughStream :: internalIterationOnThread); Thread t2 = new Thread(IterationThroughStream :: internalIterationOnThread); Thread t3 = new Thread(IterationThroughStream :: internalIterationOnThread); Thread t4 = new Thread(IterationThroughStream :: internalIterationOnThread); t1.start(); t2.start(); t3.start(); t4.start(); Thread.sleep(30000); } private static boolean externalIteration(List bigListOfStrings) { found = false; for(String s : bigListOfStrings) { if(s.equals("Counter no: 1000000")) { found = true; } } return found; } private static boolean internalIteration(List bigListOfStrings) { found = false; bigListOfStrings.parallelStream().forEach( (String s) -> { if(s.equals("Counter no: 1000000")){ //Have a breakpoint to look how many threads are spawned. found = true; } } ); return found; } private static boolean externalIterationOnSleep(List smallListOfNumbers) { found = false; for(Integer s : smallListOfNumbers) { try { Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } return found; } private static boolean internalIterationOnSleep(List smallListOfNumbers) { found = false; smallListOfNumbers.parallelStream().forEach( //Removing parallelStream() will behave as single threaded (sequential access). (Integer s) -> { try { Thread.sleep(100); //Have a breakpoint to look how many threads are spawned. } catch (Exception e) { e.printStackTrace(); } } ); return found; } public static void internalIterationOnThread() { smallListOfNumbers.parallelStream().forEach( (Integer s) -> { try { /* * DANGEROUS * This will tell you that if all the 7 FJP(Fork join pool) worker threads are blocked for one single thread (eg t1), * then other normal three(t2 - t4) thread wont execute, will wait for FJP worker threads. */ Thread.sleep(100); //Have a breakpoint here. } catch (Exception e) { e.printStackTrace(); } } ); } }