Spark:以编程方式获取集群核心数

我在纱线集群中运行我的火花应用程序。 在我的代码中,我使用数量可用的队列核心在我的数据集上创建分区:

Dataset ds = ... ds.coalesce(config.getNumberOfCores()); 

我的问题:我如何通过编程方式而不是通过配置获得队列的可用数量?

有一些方法可以从Spark中获取集群中的执行程序数和核心数。 这是我过去使用过的一些Scala实用程序代码。 您应该能够轻松地将其适应Java。 有两个关键的想法:

  1. worker的数量是执行者的数量减去1或sc.getExecutorStorageStatus.length - 1

  2. 通过在worker上执行java.lang.Runtime.getRuntime.availableProcessors ,可以获得每个worker的核心数。

其余的代码是样板,用于使用Scala implicits为SparkContext添加便捷方法。 我写了1.x年前的代码,这就是为什么它不使用SparkSession

最后一点:合并到多个核心通常是一个好主意,因为这可以在数据偏斜的情况下提高性能。 实际上,我使用1.5x到4x之间的任何位置,具体取决于数据的大小以及作业是否在共享集群上运行。

 import org.apache.spark.SparkContext import scala.language.implicitConversions class RichSparkContext(val sc: SparkContext) { def executorCount: Int = sc.getExecutorStorageStatus.length - 1 // one is the driver def coresPerExecutor: Int = RichSparkContext.coresPerExecutor(sc) def coreCount: Int = executorCount * coresPerExecutor def coreCount(coresPerExecutor: Int): Int = executorCount * coresPerExecutor } object RichSparkContext { trait Enrichment { implicit def enrichMetadata(sc: SparkContext): RichSparkContext = new RichSparkContext(sc) } object implicits extends Enrichment private var _coresPerExecutor: Int = 0 def coresPerExecutor(sc: SparkContext): Int = synchronized { if (_coresPerExecutor == 0) sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head else _coresPerExecutor } }