Tag: apache spark

我应该将变量保留为瞬态变量吗?

我一直在试验Apache Spark试图解决一些查询,如top-k,skyline等。 我创建了一个包装SparkConf和名为SparkContext的包装器。 这个类也实现了serializable,但由于SparkConf和JavaSparkContext不是可序列化的,所以类也不是。 我有一个类解决名为TopK的topK查询,该类实现了serializable,但该类还有一个不可序列化的SparkContext成员变量(由于上述原因)。 因此,每当我尝试从RDD中的.reduce()函数中执行TopK方法时,我都会收到exception。 我发现的解决方案是使SparkContext瞬态化。 我的问题是:我应该将SparkContext变量保持为瞬态还是我犯了一个大错误? SparkContext类: import java.io.Serializable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.*; public class SparkContext implements Serializable { private final SparkConf sparConf; // this is not serializable private final JavaSparkContext sparkContext; // this is not either protected SparkContext(String appName, String master) { this.sparConf = new SparkConf(); this.sparConf.setAppName(appName); this.sparConf.setMaster(master); this.sparkContext = new JavaSparkContext(sparConf); […]

Java中Spark MLlib中的矩阵运算

这个问题是关于MLlib(Spark 1.2.1+)。 操作局部矩阵的最佳方法是什么(中等大小,低于100×100,因此不需要分发)。 例如,在计算数据集的SVD之后,我需要执行一些矩阵运算。 RowMatrix仅提供乘法function。 toBreeze方法返回一个DenseMatrix但API似乎不是Java友好的: public final That $plus(B b, UFunc.UImpl2 op) 在Spark + Java中,如何执行以下任何操作: 转置矩阵 加/减两个矩阵 裁剪矩阵 执行元素操作 等等 Javadoc RowMatrix: https ://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/linalg/distributed/RowMatrix.html RDD data = …; RowMatrix matrix = new RowMatrix(data); SingularValueDecomposition svd = matrix.computeSVD(15, true, 1e-9d); RowMatrix U = svd.U(); Vector s = svd.s(); Matrix V = svd.V(); //Example 1: How […]

元素的映射变坏了

我正在实现k-means ,我想创建新的质心。 但映射留下了一个元素! 但是,当K的值较小时,如15,它将正常工作。 基于该代码,我有: val K = 25 // number of clusters val data = sc.textFile(“dense.txt”).map( t => (t.split(“#”)(0), parseVector(t.split(“#”)(1)))).cache() val count = data.count() println(“Number of records ” + count) var centroids = data.takeSample(false, K, 42).map(x => x._2) do { var closest = data.map(p => (closestPoint(p._2, centroids), p._2)) var pointsGroup = closest.groupByKey() println(pointsGroup) pointsGroup.foreach […]

Spark:以编程方式获取集群核心数

我在纱线集群中运行我的火花应用程序。 在我的代码中,我使用数量可用的队列核心在我的数据集上创建分区: Dataset ds = … ds.coalesce(config.getNumberOfCores()); 我的问题:我如何通过编程方式而不是通过配置获得队列的可用数量?

如何使用Java中的Structured Streaming从Kafka反序列化记录?

我使用Spark 2.1 。 我试图使用Spark Structured Streaming从Kafka读取记录,反序列化它们并在之后应用聚合。 我有以下代码: SparkSession spark = SparkSession .builder() .appName(“Statistics”) .getOrCreate(); Dataset df = spark .readStream() .format(“kafka”) .option(“kafka.bootstrap.servers”, kafkaUri) .option(“subscribe”, “Statistics”) .option(“startingOffsets”, “earliest”) .load(); df.selectExpr(“CAST(value AS STRING)”) 我想要的是将value字段反序列化为我的对象而不是作为String 。 我有一个自定义反序列化器。 public StatisticsRecord deserialize(String s, byte[] bytes) 我怎么能用Java做到这一点? 我找到的唯一相关链接是https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2 .html ,但这是针对Scala的。

使用Mongo-Hadoop连接器通过Apache Spark更新MongoDb中的集合

我想通过Spark in Java更新MongoDb中的特定集合。 我正在使用MongoDB Connector for Hadoop在Java中检索Apache Spark到MongoDb的信息。 在关注Sampo Niskanen关于通过Spark检索和保存MongoDb集合的优秀post后,我对更新集合感到困惑 。 MongoOutputFormat.java包含一个构造函数,它使用String [] updateKeys,我猜这是指一个可能的键列表,用于比较现有集合并执行更新。 但是,使用Spark的saveAsNewApiHadoopFile()方法和参数MongoOutputFormat.class ,我想知道如何使用该更新构造函数。 save.saveAsNewAPIHadoopFile(“file:///bogus”, Object.class, Object.class, MongoOutputFormat.class, config); 在此之前, MongoUpdateWritable.java用于执行集合更新。 从我在Hadoop上看到的例子来看,这通常是在mongo.job.output.value上mongo.job.output.value ,在Spark中可能是这样的: save.saveAsNewAPIHadoopFile(“file:///bogus”, Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config); 但是,我仍然想知道如何在MongoUpdateWritable.java指定更新密钥。 不可否认,作为一种hacky方式,我将对象的“_id”设置为我的文档的KeyValue,以便在执行保存时,集合将覆盖与_id具有相同KeyValue的文档。 JavaPairRDD analyticsResult; //JavaPairRdd of (mongoObject,result) JavaPairRDD save = analyticsResult.mapToPair(s -> { BSONObject o = (BSONObject) s._1; //for all keys, set _id to key:value_ […]

如何在Spark中将JavaPairInputDStream转换为DataSet / DataFrame

我正在尝试从kafka接收流数据。 在此过程中,我能够接收流数据并将其存储到JavaPairInputDStream中 。 现在我需要分析这些数据,而不是将其存储到任何数据库中。所以我想将此JavaPairInputDStream转换为DataSet或DataFrame 到目前为止我尝试的是: import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalog.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.AbstractJavaDStreamLike; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; //Streaming Working […]

Spark Combinebykey JAVA lambda表达式

我想使用lambda函数来计算( JavaPairRDD pairs )的密钥的平均值。 出于这个原因,我开发了以下代码: java.util.function.Function<Double, Tuple2> createAcc = x -> new Tuple2(x, 1); BiFunction<Tuple2, Double, Tuple2> addAndCount = (Tuple2 x, Double y) -> { return new Tuple2(x._1()+y, x._2()+1 ); }; BiFunction<Tuple2, Tuple2, Tuple2> combine = (Tuple2 x, Tuple2 y) -> { return new Tuple2(x._1()+y._1(), x._2()+y._2() ); }; JavaPairRDD<Integer, Tuple2> avgCounts = pairs.combineByKey(createAcc, addAndCount, combine); […]

实现java UDF并从pyspark调用它

我需要创建一个在pyspark python中使用的UDF,它使用java对象进行内部计算。 如果它是一个简单的python我会做类似的事情: def f(x): return 7 fudf = pyspark.sql.functions.udf(f,pyspark.sql.types.IntegerType()) 并使用以下方式调用: df = sqlContext.range(0,5) df2 = df.withColumn(“a”,fudf(df.id)).show() 但是,我需要的函数的实现是在java而不是在python中。 我需要以某种方式包装它,所以我可以从python中以类似的方式调用它。 我的第一个尝试是实现java对象,然后将其包装在pyspark中的python中并将其转换为UDF。 因序列化错误而失败。 Java代码: package com.test1.test2; public class TestClass1 { Integer internalVal; public TestClass1(Integer val1) { internalVal = val1; } public Integer do_something(Integer val) { return internalVal; } } pyspark代码: from py4j.java_gateway import java_import from pyspark.sql.functions import […]

在火花环境中的Uima Ruta Out of Memory问题

我在apache spark上运行UIMA应用程序。 UIMA RUTA有数百万页需要批量处理才能进行计算。 但是有一段时间我面临内存exception。它会在成功处理2000页的时候抛出exception,但有些时候会在500页上失败。 应用日志 Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.uima.internal.util.IntArrayUtils.expand_size(IntArrayUtils.java:57) at org.apache.uima.internal.util.IntArrayUtils.ensure_size(IntArrayUtils.java:39) at org.apache.uima.cas.impl.Heap.grow(Heap.java:187) at org.apache.uima.cas.impl.Heap.add(Heap.java:241) at org.apache.uima.cas.impl.CASImpl.ll_createFS(CASImpl.java:2844) at org.apache.uima.cas.impl.CASImpl.createFS(CASImpl.java:489) at org.apache.uima.cas.impl.CASImpl.createAnnotation(CASImpl.java:3837) at org.apache.uima.ruta.rule.RuleMatch.getMatchedAnnotations(RuleMatch.java:172) at org.apache.uima.ruta.rule.RuleMatch.getMatchedAnnotationsOf(RuleMatch.java:68) at org.apache.uima.ruta.rule.RuleMatch.getLastMatchedAnnotation(RuleMatch.java:73) at org.apache.uima.ruta.rule.ComposedRuleElement.mergeDisjunctiveRuleMatches(ComposedRuleElement.java:330) at org.apache.uima.ruta.rule.ComposedRuleElement.continueMatch(ComposedRuleElement.java:213) at org.apache.uima.ruta.rule.ComposedRuleElement.continueOwnMatch(ComposedRuleElement.java:362) at org.apache.uima.ruta.rule.ComposedRuleElement.fallbackContinue(ComposedRuleElement.java:459) at org.apache.uima.ruta.rule.ComposedRuleElement.continueMatch(ComposedRuleElement.java:225) at org.apache.uima.ruta.rule.ComposedRuleElement.continueOwnMatch(ComposedRuleElement.java:362) at org.apache.uima.ruta.rule.ComposedRuleElement.fallbackContinue(ComposedRuleElement.java:459) at org.apache.uima.ruta.rule.ComposedRuleElement.continueMatch(ComposedRuleElement.java:225) at org.apache.uima.ruta.rule.ComposedRuleElement.continueOwnMatch(ComposedRuleElement.java:362) at org.apache.uima.ruta.rule.ComposedRuleElement.fallbackContinue(ComposedRuleElement.java:459) at org.apache.uima.ruta.rule.ComposedRuleElement.continueMatch(ComposedRuleElement.java:225) at […]