如何在Spark中将JavaPairInputDStream转换为DataSet / DataFrame

我正在尝试从kafka接收流数据。 在此过程中,我能够接收流数据并将其存储到JavaPairInputDStream中 。 现在我需要分析这些数据,而不是将其存储到任何数据库中。所以我想将此JavaPairInputDStream转换为DataSetDataFrame

到目前为止我尝试的是:

import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalog.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.AbstractJavaDStreamLike; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; //Streaming Working Code public class KafkaToSparkStreaming { public static void main(String arr[]) throws InterruptedException { SparkConf conf = new SparkConf(); conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data. //conf.set("spark.ui.port", "7077"); //Port for application's dashboard, which shows memory and workload data. conf.set("dynamicAllocation.enabled","false"); //Which scales the number of executors registered with this application up and down based on the workload //conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); //For serializing objects that will be sent over the network or need to be cached in serialized form. //conf.setMaster("local"); conf.set("spark.streaming.stopGracefullyOnShutdown", "true"); JavaSparkContext sc = new JavaSparkContext(conf); // Create the context with 2 seconds batch size JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); Map kafkaParams = new HashMap(); kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path. kafkaParams.put("group.id", "testgroup"); //String that uniquely identifies the group of consumer processes to which this consumer belongs kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic. kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker. kafkaParams.put("request.required.acks", "1"); //Producer to require an acknowledgement from the Broker that the message was received. Set topics = Collections.singleton("ny-2008.csv"); //Create an input DStream for Receiving data from socket JavaPairInputDStream directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); //System.out.println(directKafkaStream); directKafkaStream.print(); } } 

这是使用Spark 2.0的完整工作代码。

 import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; public class KafkaToSparkStreaming { public static void main(String arr[]) throws InterruptedException { SparkConf conf = new SparkConf(); conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data. //conf.set("spark.ui.port", "7077"); //Port for application's dashboard, which shows memory and workload data. conf.set("dynamicAllocation.enabled","false"); //Which scales the number of executors registered with this application up and down based on the workload //conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); //For serializing objects that will be sent over the network or need to be cached in serialized form. conf.setMaster("local"); conf.set("spark.streaming.stopGracefullyOnShutdown", "true"); JavaSparkContext sc = new JavaSparkContext(conf); // Create the context with 2 seconds batch size JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); Map kafkaParams = new HashMap(); kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path. kafkaParams.put("group.id", "testgroup"); //String that uniquely identifies the group of consumer processes to which this consumer belongs kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic. kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker. kafkaParams.put("request.required.acks", "1"); //Producer to require an acknowledgement from the Broker that the message was received. Set topics = Collections.singleton("ny-2008.csv"); //Create an input DStream for Receiving data from socket JavaPairInputDStream directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); //Create JavaDStream JavaDStream msgDataStream = directKafkaStream.map(new Function, String>() { @Override public String call(Tuple2 tuple2) { return tuple2._2(); } }); //Create JavaRDD msgDataStream.foreachRDD(new VoidFunction>() { @Override public void call(JavaRDD rdd) { JavaRDD rowRDD = rdd.map(new Function() { @Override public Row call(String msg) { Row row = RowFactory.create(msg); return row; } }); //Create Schema StructType schema = DataTypes.createStructType(new StructField[] {DataTypes.createStructField("Message", DataTypes.StringType, true)}); //Get Spark 2.0 session SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); Dataset msgDataFrame = spark.createDataFrame(rowRDD, schema); msgDataFrame.show(); } }); ssc.start(); ssc.awaitTermination(); } } class JavaSparkSessionSingleton { private static transient SparkSession instance = null; public static SparkSession getInstance(SparkConf sparkConf) { if (instance == null) { instance = SparkSession .builder() .config(sparkConf) .getOrCreate(); } return instance; } } 

从技术上讲,Dstream是RRD序列,你不会将Dstream转换为Datframe,而是将每个RDD转换为Dataframe / Dataset,如下所示(Scala代码请在Java中将其转换为您的情况):

stream.foreachRDD {rdd =>

val dataFrame = rdd.map {case(key,value)=> Row(key,value)}。toDF()

}