Apache Spark:在Java中有效地使用mapPartitions

在当前早期发布的名为High Performance Spark的教科书中, Spark的开发人员注意到:

为了让Spark能够灵活地将一些记录溢出到磁盘,重要的是在mapPartitions中表示你的函数,这样你的函数就不会强制将整个分区加载到内存中(例如隐式转换为列表)。 迭代器有很多方法可以编写函数式转换,或者你可以构造自己的自定义迭代器。 当转换直接获取并返回迭代器而不强制它通过另一个集合时,我们称之为迭代器到迭代器的转换。

但是,教科书缺乏使用mapPartitions或类似方法变体的好例子。 并且在线存在很少的好代码示例 – 其中大多数是Scala。 例如,我们使用mapPartitions编写的mapPartitions看到这个Scala代码如何将列添加到mapPartitions中的org.apache.spark.sql.Row中 。

 def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow) sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show 

不幸的是,Java没有提供像iter.map(...)那样好的迭代器。 所以它引出了一个问题,如何有效地使用mapPartitions的迭代器到迭代器转换而不将RDD作为列表完全溢出到磁盘?

 JavaRDD collection = prevCollection.mapPartitions((Iterator iter) -> { ArrayList out = new ArrayList(); while(iter.hasNext()) { InObj current = iter.next(); out.add(someChange(current)); } return out.iterator(); }); 

这似乎是在Java示例中使用mapPartitions的一般语法,但我不知道这将是多么有效,假设你有一个拥有数万条记录的JavaRDD (甚至更多…因为,Spark是对于大数据)。 你最终会得到迭代器中所有对象的列表,只是将它转回迭代器(这就要求某种地图函数在这里效率更高)。

注意 :虽然使用mapPartitions这8行代码可以写成带有mapflatMap 1行,但我有意使用mapPartitions来利用它对每个分区而不是RDD每个元素进行操作的事实。

请问有什么想法吗?

防止强制整个分区“实现”的一种方法是将Iterator转换为Stream,然后使用Stream的functionAPI(例如map函数)。

如何将迭代器转换为流? 提出了一些将Iterator转换为Stream好方法,因此我们可以选择其中一个选项:

 rdd.mapPartitions((Iterator iter) -> { Iterable iterable = () -> iter; return StreamSupport.stream(iterable.spliterator(), false) .map(s -> transformRow(s)) // or whatever transformation .iterator(); }); 

哪个应该是“Itrator-to-Iterator”转换,因为所有使用的中间API( IterableStream )都是懒惰的评估。

编辑 :我自己没有测试过,但是OP评论说,“通过在列表中使用Stream,没有效率提升”。 我不知道为什么会这样,而且我不知道这一般是否属实,但值得一提。