Spark:以编程方式获取集群核心数
我在纱线集群中运行我的火花应用程序。 在我的代码中,我使用数量可用的队列核心在我的数据集上创建分区:
Dataset ds = ... ds.coalesce(config.getNumberOfCores());
我的问题:我如何通过编程方式而不是通过配置获得队列的可用数量?
有一些方法可以从Spark中获取集群中的执行程序数和核心数。 这是我过去使用过的一些Scala实用程序代码。 您应该能够轻松地将其适应Java。 有两个关键的想法:
-
worker的数量是执行者的数量减去1或
sc.getExecutorStorageStatus.length - 1
。 -
通过在worker上执行
java.lang.Runtime.getRuntime.availableProcessors
,可以获得每个worker的核心数。
其余的代码是样板,用于使用Scala implicits为SparkContext
添加便捷方法。 我写了1.x年前的代码,这就是为什么它不使用SparkSession
。
最后一点:合并到多个核心通常是一个好主意,因为这可以在数据偏斜的情况下提高性能。 实际上,我使用1.5x到4x之间的任何位置,具体取决于数据的大小以及作业是否在共享集群上运行。
import org.apache.spark.SparkContext import scala.language.implicitConversions class RichSparkContext(val sc: SparkContext) { def executorCount: Int = sc.getExecutorStorageStatus.length - 1 // one is the driver def coresPerExecutor: Int = RichSparkContext.coresPerExecutor(sc) def coreCount: Int = executorCount * coresPerExecutor def coreCount(coresPerExecutor: Int): Int = executorCount * coresPerExecutor } object RichSparkContext { trait Enrichment { implicit def enrichMetadata(sc: SparkContext): RichSparkContext = new RichSparkContext(sc) } object implicits extends Enrichment private var _coresPerExecutor: Int = 0 def coresPerExecutor(sc: SparkContext): Int = synchronized { if (_coresPerExecutor == 0) sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head else _coresPerExecutor } }
- Spark sql如何在不丢失空值的情况下爆炸
- 无法连接到spark master:InvalidClassException:org.apache.spark.rpc.RpcEndpointRef; 本地类不兼容
- 类型不匹配:无法从Java Spark中的Iterator 转换
- Avro Schema引发StructType
- Java,Spark和Cassandra java.lang.ClassCastException:com.datastax.driver.core.DefaultResultSetFuture无法转换为阴影
- 如何在spark数据框中展平结构?
- BroadCast变量在Spark程序中发布
- 线程“main”中的exceptionorg.apache.spark.SparkException:此JVM中只能运行一个SparkContext(参见SPARK-2243)
- Spark Java中的移动平均线