如何使用Java中的Structured Streaming从Kafka反序列化记录?

我使用Spark 2.1

我试图使用Spark Structured Streaming从Kafka读取记录,反序列化它们并在之后应用聚合。

我有以下代码:

SparkSession spark = SparkSession .builder() .appName("Statistics") .getOrCreate(); Dataset df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", kafkaUri) .option("subscribe", "Statistics") .option("startingOffsets", "earliest") .load(); df.selectExpr("CAST(value AS STRING)") 

我想要的是将value字段反序列化为我的对象而不是作为String

我有一个自定义反序列化器。

 public StatisticsRecord deserialize(String s, byte[] bytes) 

我怎么能用Java做到这一点?


我找到的唯一相关链接是https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2 .html ,但这是针对Scala的。

定义JSON消息的模式。

 StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("Id", DataTypes.IntegerType, false), DataTypes.createStructField("Name", DataTypes.StringType, false), DataTypes.createStructField("DOB", DataTypes.DateType, false) }); 

现在阅读下面的消息。 MessageData是JSON消息的JavaBean。

 Dataset df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", kafkaUri) .option("subscribe", "Statistics") .option("startingOffsets", "earliest") .load() .selectExpr("CAST(value AS STRING) as message") .select(functions.from_json(functions.col("message"),schema).as("json")) .select("json.*") .as(Encoders.bean(MessageData.class)); 

如果您的数据库中有自定义反序列化器,则在load后从Kafka获取的字节数上使用它。

 df.select("value") 

该行为您提供只有一个列value Dataset


我只使用Scala的Spark API,所以我在Scala中执行以下操作来处理“反序列化”案例:

 import org.apache.spark.sql.Encoders implicit val statisticsRecordEncoder = Encoders.product[StatisticsRecord] val myDeserializerUDF = udf { bytes => deserialize("hello", bytes) } df.select(myDeserializerUDF($"value") as "value_des") 

这应该会给你你想要的……在Scala中。 将它转换为Java是你的家庭练习:)

请注意,您的自定义对象必须具有可用的编码器,否则Spark SQL将拒绝将其对象放入数据集中。