在Apache Spark中,我可以轻松地重复/嵌套SparkContext.parallelize吗?

我正在尝试模拟我们正试图解决的遗传问题,逐步建立起来。 我可以成功运行Spark示例中的PiAverage示例。 这个例子在一个圆圈(在我们的例子中为10 ^ 6)“投掷飞镖”并计算“在圆圈中着陆”以估计PI的数量

假设我想重复该过程1000次(并行)并平均所有这些估计值。 我试图看到最好的方法,似乎有两个并行化的调用? 嵌套电话? 有没有办法将地图链接起来或减少一起呼叫? 我看不到它。

我想知道下面这个想法的智慧。 我想过使用累加器跟踪得到的估计值。 jsc是我的SparkContext,单个运行的完整代码是在问题的结尾,感谢任何输入!

Accumulator accum = jsc.accumulator(0.0); // make a list 1000 long to pass to parallelize (no for loops in Spark, right?) List numberOfEstimates = new ArrayList(HOW_MANY_ESTIMATES); // pass this "dummy list" to parallelize, which then // calls a pieceOfPI method to produce each individual estimate // accumulating the estimates. PieceOfPI would contain a // parallelize call too with the individual test in the code at the end jsc.parallelize(numberOfEstimates).foreach(accum.add(pieceOfPI(jsc, numList, slices, HOW_MANY_ESTIMATES))); // get the value of the total of PI estimates and print their average double totalPi = accum.value(); // output the average of averages System.out.println("The average of " + HOW_MANY_ESTIMATES + " estimates of Pi is " + totalPi / HOW_MANY_ESTIMATES); 

它似乎不是我在SO上看到的矩阵或其他答案给出了这个特定问题的答案,我已经做了几次搜索,但我没有看到如何在没有“并行化并行化”的情况下做到这一点。 这是个坏主意吗?

(是的,我在数学上意识到我可以做更多的估计,并有效地得到相同的结果:)试图建立我的老板想要的结构,再次感谢!

如果有帮助的话,我已将我的整个单一测试程序放在这里,没有我正在测试的累加器。 其核心将成为PieceOfPI():

 import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.Accumulable; import org.apache.spark.Accumulator; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.storage.StorageLevel; import org.apache.spark.SparkConf; import org.apache.spark.storage.StorageLevel; public class PiAverage implements Serializable { public static void main(String[] args) { PiAverage pa = new PiAverage(); pa.go(); } public void go() { // should make a parameter like all these finals should be // int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; final int SLICES = 16; // how many "darts" are thrown at the circle to get one single Pi estimate final int HOW_MANY_DARTS = 1000000; // how many "dartboards" to collect to average the Pi estimate, which we hope converges on the real Pi final int HOW_MANY_ESTIMATES = 1000; SparkConf sparkConf = new SparkConf().setAppName("PiAverage") .setMaster("local[4]"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); // setup "dummy" ArrayList of size HOW_MANY_DARTS -- how many darts to throw List throwsList = new ArrayList(HOW_MANY_DARTS); for (int i = 0; i < HOW_MANY_DARTS; i++) { throwsList.add(i); } // setup "dummy" ArrayList of size HOW_MANY_ESTIMATES List numberOfEstimates = new ArrayList(HOW_MANY_ESTIMATES); for (int i = 0; i < HOW_MANY_ESTIMATES; i++) { numberOfEstimates.add(i); } JavaRDD dataSet = jsc.parallelize(throwsList, SLICES); long totalPi = dataSet.filter(new Function() { public Boolean call(Integer i) { double x = Math.random(); double y = Math.random(); if (x * x + y * y < 1) { return true; } else return false; } }).count(); System.out.println( "The average of " + HOW_MANY_DARTS + " estimates of Pi is " + 4 * totalPi / (double)HOW_MANY_DARTS); jsc.stop(); jsc.close(); } } 

让我从你的“背景问题”开始。 mapjoingroupBy等转换操作分为两类: 那些需要从所有分区输入数据并且那些不需要的数据。 像groupByjoin这样的操作需要一个shuffle,因为你需要将所有RDD分区中的所有记录用相同的键组合在一起(想想SQL JOINGROUP BY操作是如何工作的)。 另一方面, mapflatMapfilter等不需要改组,因为操作在前一步的分区的输入上工作正常。 它们一次处理单个记录,而不是具有匹配键的组。 因此,不需要改组。

这个背景是必要的,以了解“额外的地图”没有显着的开销。 将mapflatMap等后续操作“压缩”到一个“阶段”(当您在Spark Web控制台中查看作业的详细信息时显示),以便只实现一个RDD,即阶段结束。

关于你的第一个问题。 我不会为此使用累加器。 它们用于“边带”数据,例如计算解析的线数。 在这个例子中,您可以使用累加器来计算在半径为1的范围内对多少(x,y)对,作为示例。

Spark发行版中的JavaPiSpark示例与它一样好。 你应该研究它的工作原理。 它是大数据系统的正确数据流模型。 你可以使用“聚合器”。 在Javadocs中 ,单击“index”并查看aggaggregateaggregateByKey函数。 但是,它们不再是可以理解的,在这里也没有必要。 它们提供比map更大的灵活性然后reduce ,所以它们值得了解

您的代码的问题在于您有效地尝试告诉Spark要做什么,而不是表达您的意图并让Spark优化它的工作方式。

最后,我建议你购买和学习O’Reilly的“学习星火”。 它可以很好地解释内部细节,例如分段,并且它还显示了许多可以使用的示例代码。