Tag: apache spark

在同一Master下的Java和R Apps之间共享SparkContext

所以这是设置。 目前我已经初始化了两个Spark应用程序。 我需要在它们之间传递数据(最好通过共享的sparkcontext / sqlcontext,这样我就可以查询临时表)。 我目前使用Parquet文件进行dataframe传输,但是有可能采用其他方式吗? MasterURL指向同一个SparkMaster 通过终端启动Spark: /opt/spark/sbin/start-master.sh; /opt/spark/sbin/start-slave.sh spark://`hostname`:7077 Java App设置: JavaSparkContext context = new JavaSparkContext(conf); //conf = setMaster(MasterURL), 6G memory, and 4 cores. SQLContext sqlContext = new SQLContext(parentContext.sc()); 然后我稍后注册一个现有的框架 //existing dataframe to temptable df.registerTempTable(“table”); 和 SparkR sc <- sparkR.init(master='MasterURL', sparkEnvir=list(spark.executor.memory='6G', spark.cores.max='4') sqlContext <- sparkRSQL.init(sc) # attempt to get temptable df <- sql(sqlContext, […]

如何使用Java在Spark SQL中加入多列以在DataFrame中进行过滤

DataFrame a =包含列x,y,z,k DataFrame b =包含列x,y,a a.join(b,) ??? 我试过用 a.join(b,a.col(“x”).equalTo(b.col(“x”)) && a.col(“y”).equalTo(b.col(“y”),”inner”) 但Java正在抛出错误,说&&是不允许的。 有人可以帮忙吗? 谢谢

由于java.io.NotSerializableException:org.apache.spark.SparkContext,Spark作业失败

当我尝试在RDD[(Int,ArrayBuffer[(Int,Double)])]输入上应用方法(ComputeDwt)时,我面临exception。 我甚至使用extends Serialization选项来序列化spark中的对象。这是代码片段。 input:series:RDD[(Int,ArrayBuffer[(Int,Double)])] DWTsample extends Serialization is a class having computeDwt function. sc: sparkContext val kk:RDD[(Int,List[Double])]=series.map(t=>(t._1,new DWTsample().computeDwt(sc,t._2))) Error: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) 谁能告诉我可能是什么问题以及应该采取什么措施来解决这个问题呢?

在同一JVM中检测到多个SparkContext

根据我的上一个问题,我必须为我独特的JVM定义Multiple SparkContext。 我是用下一种方式做的(使用Java): SparkConf conf = new SparkConf(); conf.setAppName(“Spark MultipleContest Test”); conf.set(“spark.driver.allowMultipleContexts”, “true”); conf.setMaster(“local”); 之后我创建了下一个源代码: SparkContext sc = new SparkContext(conf); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); 后来在代码中: JavaSparkContext ctx = new JavaSparkContext(conf); JavaRDD testRDD = ctx.parallelize(AllList); 代码执行后,我得到了下一条错误消息: 16/01/19 15:21:08 WARN SparkContext: Multiple running SparkContexts detected in the same JVM! org.apache.spark.SparkException: Only one SparkContext may be running […]

无法在Spark中配置ORC属性

我正在使用Spark 1.6(Cloudera 5.8.2)并尝试以下方法来配置ORC属性。 但它不会影响输出。 下面是我试过的代码片段。 DataFrame dataframe = hiveContext.createDataFrame(rowData, schema); dataframe.write().format(“orc”).options(new HashMap(){ { put(“orc.compress”,”SNAPPY”); put(“hive.exec.orc.default.compress”,”SNAPPY”); put(“orc.compress.size”,”524288″); put(“hive.exec.orc.default.buffer.size”,”524288″); put(“hive.exec.orc.compression.strategy”, “COMPRESSION”); } }).save(“spark_orc_output”); 除此之外,我还尝试了在hive-site.xml和hiveContext对象中设置的这些属性。 输出上的hive –orcfiledump确认未应用配置。 Orcfiledump片段如下。 Compression: ZLIB Compression size: 262144

如何在Java / Kotlin中创建一个返回复杂类型的Spark UDF?

我正在尝试编写一个返回复杂类型的UDF: private val toPrice = UDF1<String, Map> { s -> val elements = s.split(” “) mapOf(“value” to elements[0], “currency” to elements[1]) } val type = DataTypes.createStructType(listOf( DataTypes.createStructField(“value”, DataTypes.StringType, false), DataTypes.createStructField(“currency”, DataTypes.StringType, false))) df.sqlContext().udf().register(“toPrice”, toPrice, type) 但是每次我用这个: df = df.withColumn(“price”, callUDF(“toPrice”, col(“price”))) 我得到一个神秘的错误: Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$28: (string) => struct) at […]

Java – Spark SQL DataFrame映射函数不起作用

在Spark SQL中,当我尝试在DataFrame上使用map函数时,我得到了以下错误。 DataFrame类型中的方法映射(Function1,ClassTag)不适用于参数(new Function(){}) 我也关注spark 1.3文档。 https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection有任何解决方案吗? 这是我的测试代码。 // SQL can be run over RDDs that have been registered as tables. DataFrame teenagers = sqlContext.sql(“SELECT name FROM people WHERE age >= 13 AND age <= 19"); List teenagerNames = teenagers.map( new Function() { public String call(Row row) { return “Name: ” + row.getString(0); } }).collect();

如何在使用JAR运行spark-submit时将程序参数传递给main函数?

我知道这是一个微不足道的问题,但我在互联网上找不到答案。 我试图使用程序参数( String[] args )运行带有main函数的Java类。 但是,当我使用spark-submit和pass程序参数提交作业时,就像我一样 java -cp .jar 它没有阅读arg s。 我试过的命令是 bin/spark-submit analytics-package.jar –class full.package.name.ClassName 1234 someargument someArgument 这给了 Error: No main class set in JAR; please specify one with –class 当我尝试时: bin/spark-submit –class full.package.name.ClassName 1234 someargument someArgument analytics-package.jar 我明白了 Warning: Local jar /mnt/disk1/spark/1 does not exist, skipping. java.lang.ClassNotFoundException: com.relcy.analytics.query.QueryAnalytics at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) […]

Spark Kafka流媒体问题

我正在使用maven 我添加了以下依赖项 org.apache.spark spark-streaming_2.10 1.1.0 org.apache.spark spark-streaming-kafka_2.10 1.1.0 我还在代码中添加了jar SparkConf sparkConf = new SparkConf().setAppName(“KafkaSparkTest”); JavaSparkContext sc = new JavaSparkContext(sparkConf); sc.addJar(“/home/test/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.0.2/spark-streaming-kafka_2.10-1.0.2.jar”); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000)); 它可以很好地解决任何错误,当我通过spark-submit运行时,我收到以下错误,非常感谢任何帮助。 谢谢你的时间。 bin/spark-submit –class “KafkaSparkStreaming” –master local[4] try/simple-project/target/simple-project-1.0.jar 线程“main”中的exceptionjava.lang.NoClassDefFoundError:org / apache / spark / streaming / kafka / KafkaUtils位于KafkaSparkStreaming.StarkStreamingTest(KafkaSparkStreaming.java:40),位于sun.reflect的KafkaSparkStreaming.main(KafkaSparkStreaming.java:23)。 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)中的NativeMethodAccessorImpl.invoke0(Native Method)位于sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)的java.lang.reflect.Method.invoke(方法。 java:606)org.apache.spark.deploy.SparkSubmit $ .launch(SparkSubmit.scala:303)atg.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:55)at org.apache.spark .deploy.SparkSubmit.main(SparkSubmit.scala)引起:java.lang.ClassNotFoundException:java.net.URLClassLoader […]

在PySpark中运行自定义Java类

我正在尝试在PySpark中运行自定义HDFS阅读器类。 这个类是用Java编写的,我需要从PySpark访问它,无论是从shell还是使用spark-submit。 在PySpark中,我从SparkContext( sc._gateway )中检索sc._gateway 。 说我有一节课: package org.foo.module public class Foo { public int fooMethod() { return 1; } } 我试图将它打包到jar中并使用–jar选项传递给pyspark,然后运行: from py4j.java_gateway import java_import jvm = sc._gateway.jvm java_import(jvm, “org.foo.module.*”) foo = jvm.org.foo.module.Foo() 但我得到错误: Py4JError: Trying to call a package. 有人可以帮忙吗? 谢谢。