Tag: apache spark

Spark DataFrame并重命名多个列(Java)

有没有更好的方法在给定的SparkSQL DataFrame同时为所有或多个列添加前缀或重命名,而不是多次调用dataFrame.withColumnRenamed() ? 例如,如果我想检测更改(使用完全外连接)。 然后我留下两个具有相同结构的DataFrame 。

Spark SQL失败,因为“常量池已超过JVM限制0xFFFF”

我在EMR 4.6.0 + Spark 1.6.1上运行此代码: val sqlContext = SQLContext.getOrCreate(sc) val inputRDD = sqlContext.read.json(input) try { inputRDD.filter(“`first_field` is not null OR `second_field` is not null”).toJSON.coalesce(10).saveAsTextFile(output) logger.info(“DONE!”) } catch { case e : Throwable => logger.error(“ERROR” + e.getMessage) } 在saveAsTextFile的最后一个阶段,它失败并显示以下错误: 16/07/15 08:27:45 ERROR codegen.GenerateUnsafeProjection: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool has grown past JVM limit of 0xFFFF […]

java + spark:org.apache.spark.SparkException:作业已中止:任务不可序列化:java.io.NotSerializableException

我是新手,并试图运行示例JavaSparkPi.java,它运行良好,但因为我必须在另一个java中使用它我将所有东西从main复制到类中的方法并尝试调用主要方法,它说 org.apache.spark.SparkException:作业已中止:任务不可序列化:java.io.NotSerializableException 代码看起来像这样: public class JavaSparkPi { public void cal(){ JavaSparkContext jsc = new JavaSparkContext(“local”, “JavaLogQuery”); int slices = 2; int n = 100000 * slices; List l = new ArrayList(n); for (int i = 0; i < n; i++) { l.add(i); } JavaRDD dataSet = jsc.parallelize(l, slices); System.out.println(“count is: “+ dataSet.count()); dataSet.foreach(new VoidFunction(){ public […]

为什么Spark在本地模式下失败并且“无法获得broadcast_0的broadcast_0_piece0”?

我正在运行这个片段来对点的RDD进行排序,对RDD进行排序并从给定点获取K-最近点: def getKNN(sparkContext:SparkContext, k:Int, point2:Array[Double], pointsRDD:RDD[Array[Double]]): RDD[Array[Double]] = { val tuplePointDistanceRDD:RDD[(Double, Array[Double])] = pointsRDD.map(point => (DistanceUtils.euclidianDistance(point, point2), point)) sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k)) } 在我的应用程序中只使用一个SparkContext并将其作为参数传递给我的函数,我得到一个org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0在我调用sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))时, org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0错误的sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))从point2获得KNN点。 我正在构建sparkContext因为这个片段如下: var sparkContext = new SparkContext(“local”, “”) 面对这种错误的可能原因是什么? 基本上这是我的独立spark环境的LOG,其中包含此错误的堆栈跟踪: 15/12/24 11:55:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:55731] 15/12/24 11:55:29 […]

将分析数据从Spark插入Postgres

我有Cassandra数据库,我通过Apache Spark使用SparkSQL分析数据。 现在我想将这些分析的数据插入到PostgreSQL中。 有没有办法直接实现这一点,除了使用PostgreSQL驱动程序(我使用postREST和驱动程序实现它我想知道是否有任何方法,如saveToCassandra() )?

如何使用spark处理一系列hbase行?

我正在尝试使用HBase作为spark的数据源。 因此,第一步是从HBase表创建RDD。 由于Spark使用hadoop输入格式,我可以通过创建rdd找到一种使用所有行的方法http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25 / lighting-a-spark-with-hbase但我们如何为范围扫描创建RDD? 欢迎所有建议。

Apache Spark Streaming的失败集成测试

我一直试图找出我为Apache Spark项目编写的一些单元/集成测试的问题。 当使用Spark 1.1.1时,我的测试通过了。 当我尝试升级到1.4.0(也尝试过1.4.1)时,测试开始失败。 我已经设法将重现问题所需的代码减少到下面的小集成测试。 有趣的是,如果我在测试中注释掉@RunWith注释,那么测试就会正确传递。 显然我不需要@RunWith注释来进行这种减少测试,但真正的测试相当广泛地使用了模拟,所以我宁愿不必使用PowerMock。 package com.example; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) public class SampleTest { @Before public void setup() throws Exception { SparkConf conf = new SparkConf(false).setMaster(“local[2]”).setAppName(“My app”); JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(1000)); } @Test public void exampleTest() { […]

并行读取S3中的多个文件(Spark,Java)

我看到了一些关于此的讨论,但还不太明白正确的解决方案:我想将S3中的几百个文件加载到RDD中。 我现在就是这样做的: ObjectListing objectListing = s3.listObjects(new ListObjectsRequest(). withBucketName(…). withPrefix(…)); List keys = new LinkedList(); objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated() JavaRDD events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps)); ReadFromS3Function使用AmazonS3客户端执行实际读取: public Iterator call(String s) throws Exception { AmazonS3 s3Client = getAmazonS3Client(properties); S3Object object = s3Client.getObject(new GetObjectRequest(…)); InputStream is = object.getObjectContent(); List lines = new LinkedList(); String str; try […]

如何在Java中的Apache Spark中将DataFrame转换为Dataset?

我可以很容易地将Scala中的DataFrame转换为Dataset: case class Person(name:String, age:Long) val df = ctx.read.json(“/tmp/persons.json”) val ds = df.as[Person] ds.printSchema 但在Java版本中我不知道如何将Dataframe转换为Dataset? 任何想法? 我的努力是: DataFrame df = ctx.read().json(logFile); Encoder encoder = new Encoder(); Dataset ds = new Dataset(ctx,df.logicalPlan(),encoder); ds.printSchema(); 但是编译器说: Error:(23, 27) java: org.apache.spark.sql.Encoder is abstract; cannot be instantiated 编辑(解决方案): 基于@Leet-Falcon的解决方案答案: DataFrame df = ctx.read().json(logFile); Encoder encoder = Encoders.bean(Person.class); Dataset ds = […]

用于行类型Spark数据集的编码器

我想在DataSet中为Row类型编写一个编码器,用于我正在进行的地图操作。 基本上,我不明白如何编写编码器。 以下是地图操作的示例: In the example below, instead of returning Dataset, I would like to return Dataset Dataset output = dataset1.flatMap(new FlatMapFunction() { @Override public Iterator call(Row row) throws Exception { ArrayList obj = //some map operation return obj.iterator(); } },Encoders.STRING()); 据我所知,编码器需要编写如下代码: Encoder encoder = new Encoder() { @Override public StructType schema() { return join.schema(); […]