Spark Local Mode – 所有作业仅使用一个CPU核心
我们使用在单个AWS EC2实例上以本地模式运行Spark Java
"local[*]"
但是,使用New Relic工具进行性能分析和简单的“顶级”显示,我们已经编写了三个不同的Java spark工作,我们的16个核心机器中只有一个CPU核心用过(我们也尝试过不同的AWS实例,但只有一个核心永远使用)。
Runtime.getRuntime().availableProcessors()
报告16个处理器, sparkContext.defaultParallelism()
报告16个处理器。
我查看了各种Stackoverflow本地模式问题,但似乎没有解决问题。
任何建议都非常感谢。
谢谢
编辑:过程
1)使用sqlContext从光盘(S3)使用com.databricks.spark.csv读取gzip压缩的CSV文件1到DataFrame DF1。
2)使用sqlContext从光盘(S3)使用com.databricks.spark.csv将gzip压缩的CSV文件2读入DataFrame DF2。
3)使用DF1.toJavaRDD()。mapToPair(返回元组的新映射函数)RDD1
4)使用DF2.toJavaRDD()。mapToPair(返回元组的新映射函数)RDD2
5)在RDD上调用union
6)将联合RDD上的reduceByKey()调用为“按键合并”,因此具有仅具有特定键的一个实例的元组>(因为同一个键出现在RDD1和RDD2中)。
7)调用.values()。map(新映射函数,它迭代提供的List中的所有项目,并根据需要合并它们以返回相同或更小长度的List
8)调用.flatMap()来获取RDD
9)使用sqlContext从DomainClass类型的平面地图创建DataFrame
10)使用DF.coalease(1).write()将DF作为gzip压缩写入S3。
我认为你的问题是你的CSV文件是gzip压缩的。 当Spark读取文件时,它会并行加载它们,但只有在文件编解码器可拆分*时才能执行此操作。 普通(非gzip)文本和镶木地板是可拆分的,以及基因组学(我的领域)中使用的bgzip
编解码器。 您的整个文件最终分别位于一个分区中。
尝试解压缩csv.gz文件并再次运行它。 我想你会看到更好的结果!
- 可分割格式意味着如果给定一个任意文件偏移量来开始读取,您可以在块中找到下一条记录的开头并对其进行解释。 Gzipped文件不可拆分。
编辑:我在我的机器上复制了这种行为。 在3G gzip压缩文本文件上使用sc.textFile
生成1个分区。