Spark – 可以在JAVA中将MultiMap转换为DataFrame

我正在尝试将数十亿数据值的MultiMap转换为Spark DataFrame以运行计算,然后将结果写入cassandra表。

我从以下cassandra查询和循环生成多图。 我很乐意接受建议,如果有更好的方法来获取和操纵数据到DataFrame,就像我在循环中一样。

代码更新了答案:

//Build ResultSet from cassandra query for data manipulation. Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";"); //Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;"); stmt.setFetchSize(1000); ResultSet results = session.execute(stmt); // Get the Variables from each Row of Cassandra Data Multimap data = LinkedListMultimap.create(); for (Row row : results){ // Column Names in Cassandra (Case Sensitive) start_frequency = row.getDouble("Start_Frequency"); power = row.getFloat("Power"); bandwidth = row.getDouble("Bandwidth"); // Create Channel Power Buckets, place information into prepared statement binding, write to cassandra. for(channel = 1.6000E8; channel = start_frequency) && (channel <= (start_frequency + bandwidth)) ) { data.put(channel, power); } // end if channel+=increment; } // end for } // end "row" for // Create Spark List for DataFrame List values = data.asMap().entrySet() .stream() .flatMap(x -> x.getValue() .stream() .map(y -> new Value(x.getKey(), y))) .collect(Collectors.toList()); // Create DataFrame and Calculate Results sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel")) .agg(min("power"), max("power"), avg("power")) .write().mode(SaveMode.Append) .option("table", "results") .option("keyspace", "model") .format("org.apache.spark.sql.cassandra").save(); } // end session } // End Compute public class Value implements Serializable { public Value(Double channel, Float power) { this.channel = channel; this.power = power; } Double channel; Float power; public void setChannel(Double channel) { this.channel = channel; } public void setPower(Float power) { this.power = power; } public Double getChannel() { return channel; } public Float getPower() { return power; } @Override public String toString() { return "[" +channel +","+power+"]"; } } 

示例multimap具有{Double} = [Float]类型,其中每个Double可能有多个Float项

 {1.50E8=[10, 20], 1.51E8=[-10, -13, -14, -15], 1.52E8=[-10, -11] 

我需要使用spark来获得每个的最小值,最大值,平均值。 例如,对于第一个,1.50ED将是最小10,最大20,平均15。

我已经拥有了可以使用的代码,一旦我可以在temptable中获取它并作为dataframe操作:

 queryMV.groupBy(col("channel")) .agg(min("power"), max("power"), avg("power")) .write().mode(SaveMode.Append) .option("table", "results") .option("keyspace", "model") .format("org.apache.spark.sql.cassandra").save(); 

我将非常感谢有关如何使用JAVA将multimap转换为DataFrame的一些提示。 我无法找到任何关于使用带有spark的multimaps的文档。

我目前正在使用一个执行初始查询的解决方案,并使用for循环将原始数据写入新表,然后我可以直接映射到temptable / dataframe,但这需要花费很多时间,因为我必须写入数十亿行cassandra在计算之前。 我想使用多图或类似的东西,直接转换为spark进行计算。

可以使用Java parallelize方法获取T列表或者使用parallelizePairs Tuple 。 所以你需要转换。 虽然createDataFrame只适用于RDD和Scala Seq ,但需要一个模式(bean或StructType)。

为了使它变得更有趣com.google.common.collect.ImmutableEntry不可序列化,因此您需要使用Java进行转换,因此除非您将转换逻辑移动到Java,否则Java-ficated版本的@Pankaj Arora解决方案将无法工作。 即

 public class Value implements Serializable { public Value(Double a, Float b) { this.a = a; this.b = b; } Double a; Float b; public void setA(Double a) { this.a = a; } public void setB(Float b) { this.b = b; } public Double getA() { return a; } public Float getB() { return b; } public String toString() { return "[" +a +","+b+"]"; } } Multimap data = LinkedListMultimap.create(); data.put(1d, 1f); data.put(1d, 2f); data.put(2d, 3f); List values = data.asMap().entrySet() .stream() .flatMap(x -> x.getValue() .stream() .map(y -> new Value(x.getKey(), y))) .collect(Collectors.toList()); sqlContext.createDataFrame(sc.parallelize(values), Value.class).show(); 

鉴于您的编辑,我将从关闭创建对象(而不是多图)。

 case class Output(a : Double ,b : Int ) val input = Map(1.50E8-> List(10, 20) , 1.51E8-> List( -10, -13, -14, -15 ), 1.52E8-> List(-10, -11)).toArray val inputRdd = sc.parallelize(input) val queryMV = inputRdd.flatMap(x=> x._2.map(y=> Output(x._1, y))).toDF