Spark中的并发作业执行

我使用了以下格式的输入数据:

0 1 2 3 4 5 … 14 Input Location: hdfs://localhost:9000/Input/datasource 

我使用以下代码片段将RDD保存为使用多个线程的文本文件:

 package org.apache.spark.examples; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.avro.ipc.specific.Person; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; class RunnableDemo implements Runnable { private Thread t; private String threadName; private String path; private JavaRDD javaRDD; // private JavaSparkContext javaSparkContext; RunnableDemo(String threadName,JavaRDD javaRDD,String path) { this.threadName=threadName; this.javaRDD=javaRDD; this.path=path; // this.javaSparkContext=javaSparkContext; } @Override public void run() { System.out.println("Running " + threadName ); try { this.javaRDD.saveAsTextFile(path); // System.out.println(this.javaRDD.count()); Thread.sleep(50); } catch (InterruptedException e) { System.out.println("Thread " + threadName + " interrupted."); } System.out.println("Thread " + threadName + " exiting."); // this.javaSparkContext.stop(); } public void start () { System.out.println("Starting " + threadName ); if (t == null) { t = new Thread (this, threadName); t.start (); } } } public class SparkJavaTest { public static void main(String[] args) { //Spark Configurations: SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest"); JavaSparkContext ctx=new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(ctx); JavaRDD dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource"); List fields= new ArrayList(); fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true)); JavaRDD rowRDD =dataCollection.map( new Function() { @Override public Row call(String record) throws Exception { String[] fields = record.split("\u0001"); return RowFactory.create(Integer.parseInt(fields[0].trim())); } }); StructType schema = DataTypes.createStructType(fields); DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema); dataFrame.registerTempTable("data"); long recordsCount=dataFrame.count(); long splitRecordsCount=5; long splitCount =recordsCount/splitRecordsCount; List<JavaRDD> list1=new ArrayList<JavaRDD>(); for(int i=0;i="+ start +" AND Id < " + end); list1.add(temp.toJavaRDD()); } long length =list1.size(); int split=0; for (int i = 0; i < length; i++) { JavaRDD rdd1 =list1.get(i); JavaPairRDD rdd3=rdd1.cartesian(rdd1); JavaPairRDD rdd4=rdd3.filter( new Function<Tuple2,Boolean>() { public Boolean call(Tuple2 s) { Row line1=s._1; Row line2=s._2; long app1 = Integer.parseInt(line1.get(0).toString()); long app2 = Integer.parseInt(line2.get(0).toString()); if(app1<app2) { return true; } return false; } }); JavaRDD test=rdd4.map(new Function<Tuple2, String>() { @Override public String call(Tuple2 s) throws Exception { Row data1=s._1; Row data2=s._2; int x =Integer.parseInt(data1.get(0).toString()); int y =Integer.parseInt(data2.get(0).toString()); String result =x +","+ y+","+(x+y); return result; } }); RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); R.start(); split++; R.start(); int index =i; while(index<length) { JavaRDD rdd2 =list1.get(index); rdd3=rdd1.cartesian(rdd2); rdd4=rdd3.filter( new Function<Tuple2,Boolean>() { public Boolean call(Tuple2 s) { Row line1=s._1; Row line2=s._2; long app1 = Integer.parseInt(line1.get(0).toString()); long app2 = Integer.parseInt(line2.get(0).toString()); if(app1<app2) { return true; } return false; } }); test=rdd4.map(new Function<Tuple2, String>() { @Override public String call(Tuple2 s) throws Exception { Row data1=s._1; Row data2=s._2; int x =Integer.parseInt(data1.get(0).toString()); int y =Integer.parseInt(data2.get(0).toString()); String result =x +","+ y+","+(x+y); return result; } }); R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); R.start(); split++; index++; } } } } 

在这种情况下,我遇到了以下exception

在此处输入图像描述

我尝试过以下链接中提供的解决方案

如何使用单个spark上下文在Apache Spark中运行并发作业(操作)

但是,我仍然无法解决这个问题。

你能指导我解决这个问题吗?

首先,您尝试使用多个线程在驱动程序节点上执行所有工作。 这并不是出于Spark的精神,因为您的案例中的每个工作单元都独立于其他单元,并且可以在不同的机器上执行。 你有一个玩具示例,但这对于大量数据来说非常重要。

更好的方法是使用mapPartitions东西将键范围发送给每个worker,让他们执行相应的SQL查询,然后保存结果,每个worker有一个线程。 这将使代码更清晰,更容易推理(一旦你习惯了RDD的工作方式)。 您显然需要为输入数据设置适当的并行度和分区数( 此处讨论)。

您的代码的直接问题是主线程启动其他线程,但不等待它们完成。 通常这会导致生成的线程与父节点一起终止(请参阅javadoc )。 请注意,在回答链接问题时,main函数在返回之前对生成的期货执行get()操作。

你能试试吗?

 SparkConf sparkConf=new SparkConf() .setAppName("SparkJavaTest") .set("spark.driver.allowMultipleContexts", "true"); 

我也是Spark编程的新手,但这在调度程序中运行多个Spark作业时帮助了我。 根据我的理解,我们不需要考虑线程或MR逻辑/执行计划,因为Spark会为您完成。