Tag: spark streaming

为什么启动StreamingContext失败并出现“IllegalArgumentException:要求失败:没有注册输出操作,所以无需执行”?

我正在尝试使用Twitter作为源执行Spark Streaming示例,如下所示: public static void main (String.. args) { SparkConf conf = new SparkConf().setAppName(“Spark_Streaming_Twitter”).setMaster(“local”); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(2)); JavaSQLContext sqlCtx = new JavaSQLContext(sc); String[] filters = new String[] {“soccer”}; JavaReceiverInputDStream receiverStream = TwitterUtils.createStream(jssc,filters); jssc.start(); jssc.awaitTermination(); } 但我得到以下例外 Exception in thread “main” java.lang.AssertionError: assertion failed: No output streams […]

Spark Strutured Streaming自动将时间戳转换为本地时间

我有UTC和ISO8601的时间戳,但使用结构化流,它会自动转换为本地时间。 有没有办法阻止这种转换? 我想在UTC中使用它。 我正在从Kafka读取json数据,然后使用from_json Spark函数解析它们。 输入: {“Timestamp”:”2015-01-01T00:00:06.222Z”} 流: SparkSession .builder() .master(“local[*]”) .appName(“my-app”) .getOrCreate() .readStream() .format(“kafka”) … //some magic .writeStream() .format(“console”) .start() .awaitTermination(); 架构: StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField(“Timestamp”, DataTypes.TimestampType, true),}); 输出: +——————–+ | Timestamp| +——————–+ |2015-01-01 01:00:…| |2015-01-01 01:00:…| +——————–+ 如您所见,小时数自行增加。 PS:我试着尝试使用from_utc_timestamp Spark函数,但没有运气。

如何更新火花流中的广播变量?

我相信,我有一个相对常见的火花流用例: 我有一个对象流,我想根据一些参考数据进行过滤 最初,我认为使用广播变量实现这是一件非常简单的事情: public void startSparkEngine { Broadcast refdataBroadcast = sparkContext.broadcast(getRefData()); final JavaDStream filteredStream = objectStream.filter(obj -> { final ReferenceData refData = refdataBroadcast.getValue(); return obj.getField().equals(refData.getField()); } filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); } 但是,尽管很少, 我的参考数据会定期更改 我的印象是我可以在驱动程序上修改和重新广播我的变量,它会传播给每个worker,但Broadcast对象不是Serializable ,需要是final 。 我有什么替代品? 我能想到的三个解决方案是: 将引用数据查找移动到forEachPartition或forEachRdd ,以使其完全驻留在worker上。 但是,参考数据存在于REST API中,因此我还需要以某种方式存储计时器/计数器以停止对流中的每个元素访问远程数据库。 每次refdata更改时,使用新的广播变量重新启动Spark上下文。 […]