在Apache spark中,使用mapPartitions和组合使用广播变量和map之间的区别是什么

在Spark中,我们使用广播变量使每台机器只读变量的副本。 我们通常在闭包之外创建一个广播变量(例如闭包所需的查找表)以提高性能。

我们还有一个名为mapPartitions的spark转换运算符,它试图实现相同的function(使用共享变量来提高性能)。 例如,在mapPartitions中,我们可以为每个分区共享数据库连接。

那么这两者有什么区别? 我们可以互换地使用它来共享变量吗?

broadcast用于将对象发送到每个工作节点。 此对象将在该节点上的所有分区之间共享(并且对于集群中的每个节点,值/ ie对象都相同)。 广播的目标是在工作节点上的许多不同任务/分区中使用相同数据时节省网络成本。

相比之下, mapPartitions是RDD上可用的方法,并且仅在分区上像map一样工作。 是的,您可以定义新对象,例如jdbc连接,然后对每个分区都是唯一的。 但是,您不能在不同的分区之间共享它,而在不同的节点之间分享它们。

虽然KrisP提供的答案突出了所有重要的差异,但我认为值得注意的是mapPartitions只是高级转换背后的低级构建块,而不是实现共享状态的方法。

尽管mapPartitions可用于使共享喜欢的状态显式,但它在技术上不共享(其生命周期仅限于mapPartitions闭包),并且还有其他方法可以实现它。 特别是,在闭包内引用的变量在分区内共享。 为了说明让我们与单身人士玩一点:

 object DummySharedState { var i = 0L def get(x: Any) = { i += 1L i } } sc.parallelize(1 to 100, 1).map(DummySharedState.get).max // res3: Long = 100 sc.parallelize(1 to 100, 2).map(DummySharedState.get).max // res4: Long = 50 sc.parallelize(1 to 100, 50).map(DummySharedState.get).max // res5: Long = 2 

和PySpark中的类似事情:

  • 单例模块dummy_shared_state.py

     i = 0 def get(x): global i i += 1 return i 
  • 主脚本:

     from pyspark import SparkConf, SparkContext import dummy_shared_state master = "spark://..." conf = (SparkConf() .setMaster(master) .set("spark.python.worker.reuse", "false")) sc.addPyFile("dummy_shared_state.py") sc.parallelize(range(100), 1).map(dummy_shared_state.get).max() ## 100 sc.parallelize(range(100), 2).map(dummy_shared_state.get).max() ## 50 

请注意, spark.python.worker.reuse选项设置为false。 如果保持默认值,您实际上会看到以下内容:

 sc.parallelize(range(100), 2).map(dummy_shared_state.get).max() ## 50 sc.parallelize(range(100), 2).map(dummy_shared_state.get).max() ## 100 sc.parallelize(range(100), 2).map(dummy_shared_state.get).max() ## 150 

在一天结束时,您必须区分三种不同的东西:

  • 广播变量,旨在通过在工作人员上保留变量的副本而不是随每个任务运送变量来减少网络流量和内存占用
  • 在闭包之外定义并在闭包内引用的变量,必须随每个任务一起提供并为此任务共享
  • 在闭包内定义的变量是不共享的

最重要的是,有一些Python特定的陷阱与持久解释器的使用有关。

在变量生命周期中, mapfilter或其他变换)和mapPartitions之间仍然没有实际区别。