如何强制Spark执行代码?
我如何强制Spark执行对map的调用,即使它认为由于其懒惰的评估而不需要执行它?
我试图将cache()
与map调用放在一起,但仍然无法解决问题。 我的map方法实际上将结果上传到HDFS。 所以,它并非无用,但Spark认为它是。
简短回答:
要强制Spark执行转换,您需要获得结果。 有时简单的count
操作就足够了。
TL; DR:
好的,我们来看看RDD
操作 。
RDD
支持两种类型的操作:
- 转换 – 从现有数据集创建新数据集。
- actions – 在对数据集运行计算后将值返回给驱动程序。
例如, map
是一个转换,它通过一个函数传递每个数据集元素,并返回一个表示结果的新RDD。 另一方面, reduce
是一个使用某个函数聚合RDD的所有元素的操作,并将最终结果返回给驱动程序(尽管还有一个返回分布式数据集的并行reduceByKey
)。
Spark中的所有转换都是懒惰的 ,因为它们不会立即计算结果 。
相反,他们只记得应用于某些基础数据集的转换(例如文件)。 仅当操作需要将结果返回到驱动程序时才会计算转换 。 这种设计使Spark能够更有效地运行 – 例如,我们可以意识到通过map创建的数据集将用于reduce,并仅将reduce的结果返回给驱动程序,而不是更大的映射数据集。
默认情况下,每次对其执行操作时,都可以重新计算每个转换后的RDD
。 但是,您也可以使用persist
(或cache
)方法在内存中保留RDD
,在这种情况下,Spark会在群集上保留元素,以便在下次查询时更快地访问。 还支持在磁盘上保留RDD
或在多个节点上复制。
结论
要强制Spark执行对map的调用,您需要获得结果。 有时count
操作就足够了。
参考
- Spark编程指南 。
Spark 变换只描述了必须要做的事情。 要触发执行,您需要执行操作 。
在你的情况下,有一个更深层次的问题。 如果目标是创建某种副作用,比如在HDFS上存储数据,那么使用正确的方法就是foreach
。 它既是一个动作又具有干净的语义。 同样重要的是,与map
不同,它并不意味着参考透明度。
- Java中的“Lambdifying”scala函数
- 使用Java从另一个应用程序部署Apache Spark应用程序,这是最佳实践
- 带有DataFrame API的Apache Spark MLlib在createDataFrame()或read()时会产生java.net.URISyntaxException .csv(…)
- Spark Strutured Streaming自动将时间戳转换为本地时间
- Spark与Cassandra输入/输出
- Apache Spark需要5到6分钟才能从Cassandra中简单计算1亿行
- 使用–jars的spark-submit yarn-cluster不起作用?
- 在PySpark中运行自定义Java类
- Spark的Column.isin函数不带List