Spark – foreach Vs foreachPartitions何时使用什么?
我想知道foreachPartitions
是否会产生更好的性能,因为更高的并行度,与foreach
方法相比,考虑到我正在流经RDD
以便对累加器变量执行一些求和的情况。
foreach
auto在许多节点上运行循环。
但是,有时您希望在每个节点上执行某些操作。 例如,建立与数据库的连接。 您不能只建立连接并将其传递给foreach
函数:连接仅在一个节点上进行。
因此,使用foreachPartition
,您可以在运行循环之前在每个节点上建立数据库连接。
foreach
和foreachPartitions
之间确实没有那么大的区别。 在封面下, foreach
所做的就是使用提供的函数调用迭代器的foreach
。 foreachPartition
只是让你有机会在迭代器的循环之外做一些事情,通常是一些昂贵的事情,如启动数据库连接或沿着这些线路的东西。 因此,如果你没有为每个节点的迭代器做一次可以完成的任何事情并在整个过程中重复使用,那么我建议使用foreach
来提高清晰度并降低复杂性。
foreach
和foreachPartitions
是行动。
foreach(function):单位
用于调用具有副作用的操作的通用函数。 对于RDD中的每个元素,它调用传递的函数。 这通常用于操纵累加器或写入外部存储器。
注意:在foreach()
之外修改除累加器之外的变量可能会导致未定义的行为。 有关详细信息,请参阅了解闭包 。
例子 :
scala> val accum = sc.longAccumulator("My Accumulator") accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Long = 10
foreachPartition(function):单位
与
foreach()
类似,但不是为每个元素调用函数,而是为每个分区调用它。 该函数应该能够接受迭代器。 这比foreach()
更有效,因为它减少了函数调用的数量(就像mapPartitions
())。
示例foreachPartition
用法:对于每个分区,您要使用一个数据库连接(每个分区块内部),这是使用scala完成它的方法的示例用法。
df.repartition(numofpartitionsyouwant) // numPartitions ~ number of simultaneous DB connections you can planning to give... def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = { val tableHeader: String = dataFrame.columns.mkString(",") dataFrame.foreachPartition { partition => //NOTE : EACH PARTITION ONE CONNECTION (more better way is to use connection pools) val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString) //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql partition.grouped(1000).foreach { group => val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder() group.foreach { record => insertString.append("('" + record.mkString(",") + "'),") } sqlExecutorConnection.createStatement() .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES " + insertString.stripSuffix(",")) } sqlExecutorConnection.close() // close the connection so that connections wont exhaust. } }
累加器样本片段可以用它来玩…你可以通过它来测试性能
test(“Foreach - Spark”){ import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3))。foreach(x => accum.add(x)) 断言(accum.value == 6L) } test(“Foreach partition - Spark”){ import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3))。foreachPartition(x => x.foreach(accum.add(_))) 断言(accum.value == 6L) }
结论:
对分区的
foreachPartition
操作显然它比foreach
更好
经验法则:
当您访问昂贵的资源(如数据库连接等)时,应使用
foreachPartition
。这将为每个分区初始化一个而不是每个元素一个(foreach
)。 当涉及蓄能器时,您可以通过上述测试方法测量性能,这对于蓄能器也应该更快。
另外……看看map vs mappartitions有相似的概念,但它们是转换。
foreachPartition
并不意味着它是每个节点活动,而是针对每个分区执行,并且与节点数相比,您可能有大量分区,在这种情况下,您的性能可能会降低。 如果您打算在节点级别执行活动, 此处解释的解决方案可能很有用,尽管我没有对其进行测试
foreachPartition
仅在您迭代通过分区聚合的数据时才有用。
一个很好的例子是每个用户处理点击流。 每次完成用户的事件流时,您都希望清除计算缓存,但要将其保存在同一用户的记录之间,以便计算某些用户行为洞察。