SparkContext setLocalProperties

作为这个问题的延续, 您能否告诉我可以从SparkContext.setLocalProperties更改哪些属性? 我可以更改核心,RAM等吗?

根据文档描述, localPropertieslocalPropertiesprotected[spark]属性,您可以通过该属性创建逻辑作业组。 另一方面,它们是Inheritable线程局部变量。 这意味着当在变量中维护的每个线程属性必须自动传输到任何创建的子线程时,它们优先于普通线程局部变量使用。当请求运行SparkContext时,将本地属性传递给工作者或者提交一个Spark工作,然后将它们传递给DAGScheduler

通常, Local properties用于通过spark.scheduler.pool per-thread属性将作业分组到FAIR作业调度程序中的spark.scheduler.pool ,并在方法SQLExecution.withNewExecutionId中将spark.sql.execution.idspark.sql.execution.id

我没有在独立的spark集群中分配线程局部属性的经验。 值得尝试检查它。

我用属性spark.executor.memory (可用的属性在这里 )进行了一些测试,实际上是在一个非常简单的本地Spark上,每个具有不同设置的两个线程似乎被限制在线程中,带有代码(可能在这篇文章的最后,不是你要部署到生产中的代码,做一些线程的交错,以确保它不是通过一些纯粹的调度运气,我得到以下输出(清理火花输出到我的控制台):

 Thread 1 Before sleeping mem: 512 Thread 2 Before sleeping mem: 1024 Thread 1 After sleeping mem: 512 Thread 2 After sleeping mem: 1024 

非常简洁地观察一个线程中的声明属性保留在所述线程内部,虽然我很确定它很容易导致荒谬的情况,所以在应用这些技术之前我仍然建议谨慎。

 public class App { private static JavaSparkContext sc; public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local") .setAppName("Testing App"); sc = new JavaSparkContext(conf); SparkThread Thread1 = new SparkThread(1); SparkThread Thread2 = new SparkThread(2); ExecutorService executor = Executors.newFixedThreadPool(2); Future ThreadCompletion1 = executor.submit(Thread1); try { Thread.sleep(5000); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } Future ThreadCompletion2 = executor.submit(Thread2); try { ThreadCompletion1.get(); ThreadCompletion2.get(); } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private static class SparkThread implements Runnable{ private int i = 1; public SparkThread(int i) { this.i = i; } @Override public void run() { int mem = 512; sc.setLocalProperty("spark.executor.memory", Integer.toString(mem * i)); JavaRDD input = sc.textFile("test" + i); FlatMapFunction tt = s -> Arrays.asList(s.split(" ")) .iterator(); JavaRDD words = input.flatMap(tt); System.out.println("Thread " + i + " Before sleeping mem: " + sc.getLocalProperty("spark.executor.memory")); try { Thread.sleep(7000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //do some work JavaPairRDD counts = words.mapToPair(t -> new Tuple2(t, 1)) .reduceByKey((x, y) -> (int) x + (int) y); counts.saveAsTextFile("output" + i); System.out.println("Thread " + i + " After sleeping mem: " + sc.getLocalProperty("spark.executor.memory")); } } } 

LocalProperties提供了一种简单的机制,可以将(用户定义的)配置从驱动程序传递给执行程序。 您可以使用执行程序上的TaskContext来访问它们。 一个例子是SQL执行ID