BroadCast变量在Spark程序中发布
在spark-java程序中,我需要读取一个配置文件并填充HashMap,我需要将其作为广播变量发布,以便它可以在所有数据节点上使用。
我需要在CustomInputFormat类中获取此广播变量的值,该类将在datanode中运行。 我如何在我的CustomInputFormat类中指定从特定广播变量中获取值,因为广播变量是在我的驱动程序中声明的?
我正在添加一些代码来解释它:
在这个场景1我在驱动程序本身使用它,即变量在同一个类中使用:这里我可以使用Broadcat.value()方法
> final Broadcast signPrefixes = > sc.broadcast(loadCallSignTable()); > JavaPairRDD countryContactCounts = contactCounts.mapToPair( > new PairFunction<Tuple2, String, Integer> (){ > public Tuple2 call(Tuple2 callSignCount) { > String sign = callSignCount._1(); > String country = lookupCountry(sign, signPrefixes.value()); > return new Tuple2(country, callSignCount._2()); > }}).reduceByKey(new SumInts());
在场景2中,我将在自定义输入格式类中使用广播变量:
司机计划:
> final JavaSparkContext sc= new > JavaSparkContext(sConf.setAppName("ParserSpark").setMaster("yarn-cluster")); > Broadcast broadcastVar = sc.broadcast(new int[] {1, 2, 3}); > > JavaPairRDD<NullWritable, ArrayList> baseRDD = > sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class, > ArrayList.class, conf);
InputFormat.class
> public class InputFormat extends FileInputFormat { > > @Override public RecordReader<NullWritable, ArrayList> > createRecordReader(InputSplit split, TaskAttemptContext context) > throws IOException, InterruptedException{ > //I want to get the Broadcast Variable Here -- How will I do it > > RecordReader reader = new RecordReader(); reader.initialize(split, context); return reader; } @Override > protected boolean isSplitable(JobContext context, Path file) { > return false; } }
我最近遇到了这个问题。 实际上很简单(几个小时后再……一个哈!)
创建一个新的Configuration,设置你的vars,并将它传递给newAPIHadoopFile函数稍微不同的实现。
从驱动程序(在这里使用Scala):
val myConf = new Configuration(); myConf.set("var1", v1) myConf.set("var2", v2) myConf.set("var3", v3) val yourFile = sc.newAPIHadoopFile("yourFilePath", classOf[MyFileInputFormat],classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.DoubleWritable],myConf)
从您的InputFormat或InputReader ..或任何有上下文的地方(这次是Java)
context.getConfiguration().get("var1");
或者可能
job.getConfiguration().get("var2");
您将在驱动程序上创建广播val bcVariable = sc.broadcast(myVariableToBroadcast)
w / val bcVariable = sc.broadcast(myVariableToBroadcast)
并稍后使用val bcVariable = sc.broadcast(myVariableToBroadcast)
访问它
- 如何在Java / Kotlin中创建一个返回复杂类型的Spark UDF?
- 如何为每个记录生成唯一ID
- 在Apache Spark中,我可以轻松地重复/嵌套SparkContext.parallelize吗?
- TaskSchedulerImpl:初始作业未接受任何资源;
- 将分析数据从Spark插入Postgres
- 在封闭范围内定义的局部变量日志必须是最终的或有效的最终
- 如何使用Java有效地读取Hadoop(HDFS)文件中的第一行?
- Spark – 任务不可序列化:如何使用调用外部类/对象的复杂映射闭包?
- 线程“main”中的exceptionorg.apache.spark.SparkException:此JVM中只能运行一个SparkContext(参见SPARK-2243)