SparkSession.createDataset()只允许List, RDD, or Seq – 但它不支持JavaPairRDD 。 因此,如果我有一个我想要创建Dataset的JavaPairRDD ,那么SparkSession.createDataset()限制的可行工作区SparkSession.createDataset()可以创建包含两个字段的包装器UserMap类: String和User 。 然后执行spark.createDataset(userMap, Encoders.bean(UserMap.class)); ?
在spark-java程序中,我需要读取一个配置文件并填充HashMap,我需要将其作为广播变量发布,以便它可以在所有数据节点上使用。 我需要在CustomInputFormat类中获取此广播变量的值,该类将在datanode中运行。 我如何在我的CustomInputFormat类中指定从特定广播变量中获取值,因为广播变量是在我的驱动程序中声明的? 我正在添加一些代码来解释它: 在这个场景1我在驱动程序本身使用它,即变量在同一个类中使用:这里我可以使用Broadcat.value()方法 > final Broadcast signPrefixes = > sc.broadcast(loadCallSignTable()); > JavaPairRDD countryContactCounts = contactCounts.mapToPair( > new PairFunction<Tuple2, String, Integer> (){ > public Tuple2 call(Tuple2 callSignCount) { > String sign = callSignCount._1(); > String country = lookupCountry(sign, signPrefixes.value()); > return new Tuple2(country, callSignCount._2()); > }}).reduceByKey(new SumInts()); 在场景2中,我将在自定义输入格式类中使用广播变量: 司机计划: > final JavaSparkContext sc= new […]
我们可以调用JavaSparkContext.wholeTextFiles并获取JavaPairRDD ,其中第一个String是文件名,第二个String是整个文件内容。 在Dataset API中是否有类似的方法,或者我所能做的就是将文件加载到JavaPairRDD然后转换为Dataset(这是有效的,但我正在寻找非RDD解决方案)。
我使用ElasticSearch-Hadoop Library从ElsticSearch获取数据。 JavaPairRDD<String, Map> esRDD = JavaEsSpark.esRDD(sc); 现在我有了JavaPairRDD。 我想在这个RDD上使用来自MLLib的随机森林。 所以我将它转换为JavaPairRDD.toRDD(esRDD)这将给我RDD。 使用RDD我再次转换为JavaRDD JavaRDD[] splits = (JavaRDD.fromRDD(JavaPairRDD.toRDD(esRDD), esRDD.classTag())).randomSplit(new double[] { 0.5, 0.5 }); JavaRDD trainingData = splits[0]; JavaRDD testData = splits[1]; 我想将trainingData和TestData传递给Random Forest算法,但它在编译时给出了转换exception。 类型不匹配:无法从JavaRDD [Tuple2 [String,Map [String,Object]]] []转换为JavaRDD [LabeledPoint] [] 添加方括号,小于和大于符号不起作用 任何人都可以建议我正确的铸造方式。 我是Spark Datastrucutres的新手。
我有一个下面的数据集,第一列是部门,第二列是工资。 我想按部门计算工资的平均值。 IT 2000000 HR 2000000 IT 1950000 HR 2200000 Admin 1900000 IT 1900000 IT 2200000 我在下面进行了操作 JavaPairRDD<String, Iterable> rddY = employees.groupByKey(); System.out.println(“” + rddY.collect()); 得到以下输出: [(IT,[2000000, 1950000, 1900000, 2200000]), (HR,[2000000, 2200000]), (Admin,[1900000])] 我需要的是 我想用spark RDD计算总平均值和部门平均值。 如何在spark中使用groupBy函数来计算平均值。
我正在构建一个接收RDD的generics函数,并对其进行一些计算。 由于我在输入RDD上运行多个计算,我想缓存它。 例如: public JavaRDD foo(JavaRDD r) { r.cache(); JavaRDD t1 = r… //Some calculations JavaRDD t2 = r… //Other calculations return t1.union(t2); } 我的问题是,因为r是给我的,它可能已经或可能没有被缓存。 如果它被缓存并且我再次调用缓存,那么spark会创建一个新的缓存层,这意味着在计算t1和t2 ,我将在缓存中有两个r实例吗? 或者火花是否意识到r被缓存并将忽略它?
我有一个看起来像这样的JavaRDD。 [ [A,8] [B,3] [C,5] [A,2] [B,8] … … ] 我希望我的结果是卑鄙的 [ [A,5] [B,5.5] [C,5] ] 如何仅使用Java RDD执行此操作。 PS:我想避免groupBy操作,所以我没有使用DataFrames。
我正在实现k-means ,我想创建新的质心。 但映射留下了一个元素! 但是,当K的值较小时,如15,它将正常工作。 基于该代码,我有: val K = 25 // number of clusters val data = sc.textFile(“dense.txt”).map( t => (t.split(“#”)(0), parseVector(t.split(“#”)(1)))).cache() val count = data.count() println(“Number of records ” + count) var centroids = data.takeSample(false, K, 42).map(x => x._2) do { var closest = data.map(p => (closestPoint(p._2, centroids), p._2)) var pointsGroup = closest.groupByKey() println(pointsGroup) pointsGroup.foreach […]
我想通过Spark in Java更新MongoDb中的特定集合。 我正在使用MongoDB Connector for Hadoop在Java中检索Apache Spark到MongoDb的信息。 在关注Sampo Niskanen关于通过Spark检索和保存MongoDb集合的优秀post后,我对更新集合感到困惑 。 MongoOutputFormat.java包含一个构造函数,它使用String [] updateKeys,我猜这是指一个可能的键列表,用于比较现有集合并执行更新。 但是,使用Spark的saveAsNewApiHadoopFile()方法和参数MongoOutputFormat.class ,我想知道如何使用该更新构造函数。 save.saveAsNewAPIHadoopFile(“file:///bogus”, Object.class, Object.class, MongoOutputFormat.class, config); 在此之前, MongoUpdateWritable.java用于执行集合更新。 从我在Hadoop上看到的例子来看,这通常是在mongo.job.output.value上mongo.job.output.value ,在Spark中可能是这样的: save.saveAsNewAPIHadoopFile(“file:///bogus”, Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config); 但是,我仍然想知道如何在MongoUpdateWritable.java指定更新密钥。 不可否认,作为一种hacky方式,我将对象的“_id”设置为我的文档的KeyValue,以便在执行保存时,集合将覆盖与_id具有相同KeyValue的文档。 JavaPairRDD analyticsResult; //JavaPairRdd of (mongoObject,result) JavaPairRDD save = analyticsResult.mapToPair(s -> { BSONObject o = (BSONObject) s._1; //for all keys, set _id to key:value_ […]
我有5个parititions-RDD和5个工人/执行者。 我怎样才能让Spark将每个RDD的分区保存在不同的worker(ip)上? 如果我说Spark可以在一个工作人员上保存几个分区,而在其他工作人员上有0个分区,我是对的吗? 我可以指定分区数,但Spark仍然可以在单个节点上缓存所有内容。 复制不是一种选择,因为RDD是巨大的。 我找到的解决方法 getPreferredLocations RDD的getPreferredLocations方法不提供100%保证该分区将存储在指定节点上。 Spark将在spark.locality.wait期间spark.locality.wait ,但之后Spark将在不同节点上缓存分区。 作为workarround ,您可以为spark.locality.wait设置非常高的值并覆盖getPreferredLocations 。 坏消息 – 你不能用Java做到这一点,你需要编写Scala代码。 至少Scala内部包含Java代码。 即: class NodeAffinityRDD[U: ClassTag](prev: RDD[U]) extends RDD[U](prev) { val nodeIPs = Array(“192.168.2.140″,”192.168.2.157″,”192.168.2.77”) override def getPreferredLocations(split: Partition): Seq[String] = Seq(nodeIPs(split.index % nodeIPs.length)) } SparkContext的makeRDD SparkContext有makeRDD方法 。 这种方法缺乏文献记载。 据我所知,我可以指定首选位置,而不是设置spark.locality.wait高值。 坏消息 – 首选位置将在第一次shuffle / join / cogroup操作中被丢弃 。 这两种方法都有一个太高spark.locality.wait缺点,如果一些节点不可用,可能会导致您的集群sturve。 PS更多背景 我有多达10,000个sales-XXX.parquet文件,每个文件代表不同地区不同商品的销售情况。 […]