Apache Spark:在Java中有效地使用mapPartitions
在当前早期发布的名为High Performance Spark的教科书中, Spark的开发人员注意到:
为了让Spark能够灵活地将一些记录溢出到磁盘,重要的是在
mapPartitions
中表示你的函数,这样你的函数就不会强制将整个分区加载到内存中(例如隐式转换为列表)。 迭代器有很多方法可以编写函数式转换,或者你可以构造自己的自定义迭代器。 当转换直接获取并返回迭代器而不强制它通过另一个集合时,我们称之为迭代器到迭代器的转换。
但是,教科书缺乏使用mapPartitions
或类似方法变体的好例子。 并且在线存在很少的好代码示例 – 其中大多数是Scala。 例如,我们使用mapPartitions
编写的mapPartitions看到这个Scala代码如何将列添加到mapPartitions中的org.apache.spark.sql.Row中 。
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow) sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
不幸的是,Java没有提供像iter.map(...)
那样好的迭代器。 所以它引出了一个问题,如何有效地使用mapPartitions
的迭代器到迭代器转换而不将RDD
作为列表完全溢出到磁盘?
JavaRDD collection = prevCollection.mapPartitions((Iterator iter) -> { ArrayList out = new ArrayList(); while(iter.hasNext()) { InObj current = iter.next(); out.add(someChange(current)); } return out.iterator(); });
这似乎是在Java示例中使用mapPartitions
的一般语法,但我不知道这将是多么有效,假设你有一个拥有数万条记录的JavaRDD
(甚至更多…因为,Spark是对于大数据)。 你最终会得到迭代器中所有对象的列表,只是将它转回迭代器(这就要求某种地图函数在这里效率更高)。
注意 :虽然使用mapPartitions
这8行代码可以写成带有map
或flatMap
1行,但我有意使用mapPartitions
来利用它对每个分区而不是RDD
每个元素进行操作的事实。
请问有什么想法吗?
防止强制整个分区“实现”的一种方法是将Iterator
转换为Stream,然后使用Stream
的functionAPI(例如map
函数)。
如何将迭代器转换为流? 提出了一些将Iterator
转换为Stream
好方法,因此我们可以选择其中一个选项:
rdd.mapPartitions((Iterator iter) -> { Iterable iterable = () -> iter; return StreamSupport.stream(iterable.spliterator(), false) .map(s -> transformRow(s)) // or whatever transformation .iterator(); });
哪个应该是“Itrator-to-Iterator”转换,因为所有使用的中间API( Iterable
, Stream
)都是懒惰的评估。
编辑 :我自己没有测试过,但是OP评论说,“通过在列表中使用Stream,没有效率提升”。 我不知道为什么会这样,而且我不知道这一般是否属实,但值得一提。