强制分区存储在特定执行程序中

我有5个parititions-RDD和5个工人/执行者。 我怎样才能让Spark将每个RDD的分区保存在不同的worker(ip)上?

如果我说Spark可以在一个工作人员上保存几个分区,而在其他工作人员上有0个分区,我是对的吗? 我可以指定分区数,但Spark仍然可以在单个节点上缓存所有内容。

复制不是一种选择,因为RDD是巨大的。

我找到的解决方法

getPreferredLocations

RDD的getPreferredLocations方法不提供100%保证该分区将存储在指定节点上。 Spark将在spark.locality.wait期间spark.locality.wait ,但之后Spark将在不同节点上缓存分区。

作为workarround ,您可以为spark.locality.wait设置非常高的值并覆盖getPreferredLocations 。 坏消息 – 你不能用Java做到这一点,你需要编写Scala代码。 至少Scala内部包含Java代码。 即:

 class NodeAffinityRDD[U: ClassTag](prev: RDD[U]) extends RDD[U](prev) { val nodeIPs = Array("192.168.2.140","192.168.2.157","192.168.2.77") override def getPreferredLocations(split: Partition): Seq[String] = Seq(nodeIPs(split.index % nodeIPs.length)) } 

SparkContext的makeRDD

SparkContext有makeRDD方法 。 这种方法缺乏文献记载。 据我所知,我可以指定首选位置,而不是设置spark.locality.wait高值。 坏消息 – 首选位置将在第一次shuffle / join / cogroup操作中被丢弃 。


这两种方法都有一个太高spark.locality.wait缺点,如果一些节点不可用,可能会导致您的集群sturve。

PS更多背景

我有多达10,000个sales-XXX.parquet文件,每个文件代表不同地区不同商品的销售情况。 每个sales-XXX.parquet可以从几KB到几GB不等。 所有sales-XXX.parquet一起在HDFS上可能需要数十或数百GB。 我需要通过所有销售进行全文搜索。 我必须用Lucene sales-XXX.parquet索引每个sales-XXX.parquet 。 现在我有两个选择:

  1. 在Spark中保留Lucene索引。 已有解决方案 ,但看起来很可疑。 有没有更好的解决方案?
  2. 将Lucene索引保留在本地文件系统中。 比我可以map-reduce每个worker的索引查找结果。 但是这种方法要求每个工作节点保持相同数量的数据。 我怎样才能确保Spark在每个工作节点上保留相同数量的数据?