如何在YARN Spark作业中设置环境变量?
我试图通过使用带有newAPIHadoopRDD的AccumuloInputFormat从Apache Spark作业(用Java编写)访问Accumulo 1.6 。 为了做到这一点,我必须通过调用setZooKeeperInstance
方法告诉AccumuloInputFormat
在哪里找到ZooKeeper。 此方法采用ClientConfiguration
对象,该对象指定各种相关属性。
我正在通过调用静态loadDefault
方法创建我的ClientConfiguration
对象。 此方法应该在client.conf
文件的各个位置查看以从中加载其默认值。 应该看的其中一个地方是$ACCUMULO_CONF_DIR/client.conf
。
因此,我试图设置ACCUMULO_CONF_DIR
环境变量,使其在Spark运行作业时可见(作为参考,我试图在yarn-cluster
部署模式下运行)。 我还没有找到成功的方法。
到目前为止,我已经尝试过:
- 在
setExecutorEnv("ACCUMULO_CONF_DIR", "/etc/accumulo/conf")
调用setExecutorEnv("ACCUMULO_CONF_DIR", "/etc/accumulo/conf")
- 在
spark-env.sh
导出spark-env.sh
- 在
spark-defaults.conf
设置spark.executorEnv.ACCUMULO_CONF_DIR
他们都没有工作。 当我在调用setZooKeeperInstance
之前打印环境时,不会出现ACCUMULO_CONF_DIR
。
如果它是相关的,我正在使用CDH5版本的所有东西。
这是我正在尝试做的一个例子(为简洁而省略了导入和exception处理):
public class MySparkJob { public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("MySparkJob"); sparkConf.setExecutorEnv("ACcUMULO_CONF_DIR", "/etc/accumulo/conf"); JavaSparkContext sc = new JavaSparkContext(sparkConf); Job accumuloJob = Job.getInstance(sc.hadoopConfiguration()); // Foreach loop to print environment, shows no ACCUMULO_CONF_DIR ClientConfiguration accumuloConfiguration = ClientConfiguration.loadDefault(); AccumuloInputFormat.setZooKeeperInstance(accumuloJob, accumuloConfiguration); // Other calls to AccumuloInputFormat static functions to configure it properly. JavaPairRDD accumuloRDD = sc.newAPIHadoopRDD(accumuloJob.getConfiguration(), AccumuloInputFormat.class, Key.class, Value.class); } }
所以我在写这个问题时找到了答案(抱歉,信誉求职者)。 问题是CDH5使用Spark 1.0.0,而我正在通过YARN运行该作业。 显然,YARN模式不关注执行程序环境,而是使用环境变量SPARK_YARN_USER_ENV
来控制其环境。 因此,确保SPARK_YARN_USER_ENV
包含ACCUMULO_CONF_DIR=/etc/accumulo/conf
,并使ACCUMULO_CONF_DIR
在问题源示例中指定点的环境中可见。
独立模式和YARN模式工作方式的差异导致了SPARK-1680 ,在Spark 1.1.0中报告已修复。
- Spark提交失败,包含java.lang.NoSuchMethodError:scala.Predef $。$ conforms()Lscala / Predef $$ less $ colon $ less;
- Spark – 任务不可序列化:如何使用调用外部类/对象的复杂映射闭包?
- Spark:从具有不同内存/核心配置的单个JVM作业同时启动
- 将分析数据从Spark插入Postgres
- 缓存()/ persist()的apache-spark内存消耗
- Avro Schema引发StructType
- 从Apache Spark SQL中的用户定义聚合函数(UDAF)返回多个数组
- 如何在Java Spark RDD上执行标准偏差和平均操作?
- 如何使用spark处理一系列hbase行?