Java和两个double 并行流

假设我有这两个矩阵:

double[][] a = new double[2][2] a[0][0] = 1 a[0][1] = 2 a[1][0] = 3 a[1][1] = 4 double[][] b = new double[2][2] b[0][0] = 1 b[0][1] = 2 b[1][0] = 3 b[1][1] = 4 

以传统的方式,总结这个矩阵我会做一个嵌套for循环:

 int rows = a.length; int cols = a[0].length; double[][] res = new double[rows][cols]; for(int i = 0; i < rows; i++){ for(int j = 0; j < cols; j++){ res[i][j] = a[i][j] + b[i][j]; } } 

我对流API很新,但我认为这非常适合与parallelStream一起使用,所以我的问题是如果有办法做到这一点并利用并行处理?

编辑:不确定这是不是正确的地方,但我们在这里:使用一些建议我把Stream推到测试。 设置如下:经典方法:

 public class ClassicMatrix { private final double[][] components; private final int cols; private final int rows; public ClassicMatrix(final double[][] components){ this.components = components; this.rows = components.length; this.cols = components[0].length; } public ClassicMatrix addComponents(final ClassicMatrix a) { final double[][] res = new double[rows][cols]; for (int i = 0; i < rows; i++) { for (int j = 0; j < rows; j++) { res[i][j] = components[i][j] + a.components[i][j]; } } return new ClassicMatrix(res); } } 

使用@dkatzel建议:

 public class MatrixStream1 { private final double[][] components; private final int cols; private final int rows; public MatrixStream1(final double[][] components){ this.components = components; this.rows = components.length; this.cols = components[0].length; } public MatrixStream1 addComponents(final MatrixStream1 a) { final double[][] res = new double[rows][cols]; IntStream.range(0, rows*cols).parallel().forEach(i -> { int x = i/rows; int y = i%rows; res[x][y] = components[x][y] + a.components[x][y]; }); return new MatrixStream1(res); } } 

使用@Eugene建议:

 public class MatrixStream2 { private final double[][] components; private final int cols; private final int rows; public MatrixStream2(final double[][] components) { this.components = components; this.rows = components.length; this.cols = components[0].length; } public MatrixStream2 addComponents(final MatrixStream2 a) { final double[][] res = new double[rows][cols]; IntStream.range(0, rows) .forEach(i -> Arrays.parallelSetAll(res[i], j -> components[i][j] * a.components[i][j])); return new MatrixStream2(res); } } 

和一个测试类,每个方法运行3次独立时间(只需替换main()中的方法名称):

 public class MatrixTest { private final static String path = "/media/manuel/workspace/data/"; public static void main(String[] args) { final List lst = new ArrayList(); for (int i = 100; i < 8000; i = i + 400) { final Double[] d = testClassic(i); System.out.println(d[0] + " : " + d[1]); lst.add(d); } IOUtils.saveToFile(path + "classic.csv", lst); } public static Double[] testClassic(final int i) { final ClassicMatrix a = new ClassicMatrix(rand(i)); final ClassicMatrix b = new ClassicMatrix(rand(i)); final long start = System.currentTimeMillis(); final ClassicMatrix mul = a.addComponents(b); final long now = System.currentTimeMillis(); final double elapsed = (now - start); return new Double[] { (double) i, elapsed }; } public static Double[] testStream1(final int i) { final MatrixStream1 a = new MatrixStream1(rand(i)); final MatrixStream1 b = new MatrixStream1(rand(i)); final long start = System.currentTimeMillis(); final MatrixStream1 mul = a.addComponents(b); final long now = System.currentTimeMillis(); final double elapsed = (now - start); return new Double[] { (double) i, elapsed }; } public static Double[] testStream2(final int i) { final MatrixStream2 a = new MatrixStream2(rand(i)); final MatrixStream2 b = new MatrixStream2(rand(i)); final long start = System.currentTimeMillis(); final MatrixStream2 mul = a.addComponents(b); final long now = System.currentTimeMillis(); final double elapsed = (now - start); return new Double[] { (double) i, elapsed }; } private static double[][] rand(final int size) { final double[][] rnd = new double[size][size]; for (int i = 0; i < size; i++) { for (int j = 0; j < size; j++) { rnd[i][j] = Math.random(); } } return rnd; } } 

结果:

 Classic Matrix size, Time (ms) 100.0,1.0 500.0,5.0 900.0,5.0 1300.0,43.0 1700.0,94.0 2100.0,26.0 2500.0,33.0 2900.0,46.0 3300.0,265.0 3700.0,71.0 4100.0,87.0 4500.0,380.0 4900.0,432.0 5300.0,215.0 5700.0,238.0 6100.0,577.0 6500.0,677.0 6900.0,609.0 7300.0,584.0 7700.0,592.0 Stream1, Time(ms) 100.0,86.0 500.0,13.0 900.0,9.0 1300.0,47.0 1700.0,92.0 2100.0,29.0 2500.0,33.0 2900.0,46.0 3300.0,253.0 3700.0,71.0 4100.0,90.0 4500.0,352.0 4900.0,373.0 5300.0,497.0 5700.0,485.0 6100.0,579.0 6500.0,711.0 6900.0,800.0 7300.0,780.0 7700.0,902.0 Stream2, Time(ms) 100.0,111.0 500.0,42.0 900.0,12.0 1300.0,54.0 1700.0,97.0 2100.0,110.0 2500.0,177.0 2900.0,71.0 3300.0,250.0 3700.0,106.0 4100.0,359.0 4500.0,143.0 4900.0,233.0 5300.0,261.0 5700.0,289.0 6100.0,406.0 6500.0,814.0 6900.0,830.0 7300.0,828.0 7700.0,911.0 

我制作了一个更好比较的情节: 性能测试

根本没有任何改善。 哪个是缺陷? 矩阵是否小(7700 x 7700)? 大于此,它会炸毁我的计算机内存。

一种方法是使用Arrays.parallelSetAll

 int rows = a.length; int cols = a[0].length; double[][] res = new double[rows][cols]; Arrays.parallelSetAll(res, i -> { Arrays.parallelSetAll(res[i], j -> a[i][j] + b[i][j]); return res[i]; }); 

我不是100%肯定,但我认为对Arrays.parallelSetAll的内部调用可能不值得为每行的列生成内部并行化的开销。 也许它只够并行化每行的总和:

 Arrays.parallelSetAll(res, i -> { Arrays.setAll(res[i], j -> a[i][j] + b[i][j]); return res[i]; }); 

无论如何,在将并行化添加到算法之前,您应该仔细测量,因为很多时候开销太大而不值得使用它。

这还没有测量(稍后我会),但是不应该已经在Arrays.parallelSetAll构建,以最快的方式完成工作吗?

  for (int i = 0; i < a.length; ++i) { int j = i; Arrays.parallelSetAll(r[j], x -> a[j][x] + b[j][x]); } 

甚至更好:

 IntStream.range(0, a.length) .forEach(i -> Arrays.parallelSetAll(r[i], j -> a[i][j] + b[i][j])); 

这对CPU缓存也很有用,因为下一个条目在同一缓存行中的概率很大。 以相反的顺序(列和行)执行读取将分散读取所有地方。

我在这里放了一个jmh测试。 请注意, Federico的答案是最快的。 上去投票他的想法。

结果如下:

 Benchmark (howManyEntries) Mode Cnt Score Error Units DoubleArraySum.dkatzel 100 avgt 10 0.055 ± 0.005 ms/op DoubleArraySum.dkatzel 500 avgt 10 0.997 ± 0.156 ms/op DoubleArraySum.dkatzel 1000 avgt 10 4.162 ± 0.368 ms/op DoubleArraySum.dkatzel 3000 avgt 10 39.619 ± 4.391 ms/op DoubleArraySum.dkatzel 8000 avgt 10 236.468 ± 41.599 ms/op DoubleArraySum.eugene 100 avgt 10 0.671 ± 0.187 ms/op DoubleArraySum.eugene 500 avgt 10 6.317 ± 0.268 ms/op DoubleArraySum.eugene 1000 avgt 10 14.751 ± 0.676 ms/op DoubleArraySum.eugene 3000 avgt 10 65.174 ± 6.044 ms/op DoubleArraySum.eugene 8000 avgt 10 285.571 ± 23.206 ms/op DoubleArraySum.federico1 100 avgt 10 0.169 ± 0.010 ms/op DoubleArraySum.federico1 500 avgt 10 1.999 ± 0.217 ms/op DoubleArraySum.federico1 1000 avgt 10 6.087 ± 1.108 ms/op DoubleArraySum.federico1 3000 avgt 10 40.825 ± 4.853 ms/op DoubleArraySum.federico1 8000 avgt 10 267.446 ± 37.490 ms/op DoubleArraySum.federico2 100 avgt 10 0.034 ± 0.003 ms/op DoubleArraySum.federico2 500 avgt 10 0.974 ± 0.152 ms/op DoubleArraySum.federico2 1000 avgt 10 3.245 ± 0.080 ms/op DoubleArraySum.federico2 3000 avgt 10 30.503 ± 5.960 ms/op DoubleArraySum.federico2 8000 avgt 10 183.183 ± 21.861 ms/op DoubleArraySum.holijava 100 avgt 10 0.063 ± 0.002 ms/op DoubleArraySum.holijava 500 avgt 10 1.112 ± 0.020 ms/op DoubleArraySum.holijava 1000 avgt 10 4.138 ± 0.062 ms/op DoubleArraySum.holijava 3000 avgt 10 41.784 ± 1.029 ms/op DoubleArraySum.holijava 8000 avgt 10 266.590 ± 4.080 ms/op DoubleArraySum.pivovarit 100 avgt 10 0.112 ± 0.002 ms/op DoubleArraySum.pivovarit 500 avgt 10 2.427 ± 0.075 ms/op DoubleArraySum.pivovarit 1000 avgt 10 9.572 ± 0.355 ms/op DoubleArraySum.pivovarit 3000 avgt 10 84.413 ± 2.197 ms/op DoubleArraySum.pivovarit 8000 avgt 10 690.942 ± 34.993 ms/op 

编辑

这是一个更易读的输出(federico赢得所有输入)

 100=[federico2, dkatzel, holijava, pivovarit, federico1, eugene] 500=[federico2, dkatzel, holijava, federico1, pivovarit, eugene] 1000=[federico2, holijava, dkatzel, federico1, pivovarit, eugene] 3000=[federico2, dkatzel, federico1, holijava, eugene, pivovarit] 8000=[federico2, dkatzel, holijava, federico1, eugene, pivovarit] 

我在这里看到的唯一选择是更多/更少生成所有可能的索引对,然后提取元素并应用求和。 使用并行流不会有这样一个小例子的任何额外的积极效果,但你可以肯定在这里使用Stream API(如果需要立即转换为并行),虽然结果并不像预期的那样好:

 IntStream.range(0, a.length).boxed() .flatMap(i -> IntStream.range(0, a[0].length) .mapToObj(j -> new AbstractMap.SimpleImmutableEntry<>(i, j))) .parallel() .forEach(e -> { res[e.getKey()][e.getValue()] = a[e.getKey()][e.getValue()] + b[e.getKey()][e.getValue()]; }); 

我们需要引入一个中间人(中间对?),以便我们可以并行化一个Stream而不是使用并行化的嵌套Streams

另一种高级方法是实现自己的自定义收集器,但在某些时候仍然会涉及嵌套循环。


尝试对两个数组中的所有值求和时,可以观察到Stream API的真正function:

 Stream.concat(Arrays.stream(a), Arrays.stream(b)).parallel() .flatMapToDouble(Arrays::stream) .sum(); 

您可以使用IntStream在矩阵中的单元格数量上创建流,然后进行一些数学运算以将该int转换为矩阵位置。

 IntStream.range(0, rows*cols) .parallel() .forEach( i->{ int x = i/rows; int y = i%rows; res[x][y] = a[x][y] + b[x][y]; }); 

这个问题的其他答案不仅是错误的(在撰写本文时),而是创建了多个影响性能的Streams,甚至也没有并行

正如@Holger指出的那样,虽然这个单一流可能更容易阅读,但是分区和模数的性能成本将使其比仅有添加的Stream of Stream更慢,直到有许多内核。 我不确定需要补偿多少

这个怎么样?

 double[][] res = IntStream.range(0, a.length).parallel() .mapToObj(i -> IntStream.range(0, a[i].length) .mapToDouble(j -> a[i][j] + b[i][j]) .toArray() ) .toArray(double[][]::new); System.out.println(res); // ^--- [[2., 4.], [6., 8.]]