如何更新火花流中的广播变量?

我相信,我有一个相对常见的火花流用例:

我有一个对象流,我想根据一些参考数据进行过滤

最初,我认为使用广播变量实现这是一件非常简单的事情:

public void startSparkEngine { Broadcast refdataBroadcast = sparkContext.broadcast(getRefData()); final JavaDStream filteredStream = objectStream.filter(obj -> { final ReferenceData refData = refdataBroadcast.getValue(); return obj.getField().equals(refData.getField()); } filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); } 

但是,尽管很少, 我的参考数据会定期更改

我的印象是我可以在驱动程序上修改和重新广播我的变量,它会传播给每个worker,但Broadcast对象不是Serializable ,需要是final

我有什么替代品? 我能想到的三个解决方案是:

  1. 将引用数据查找移动到forEachPartitionforEachRdd ,以使其完全驻留在worker上。 但是,参考数据存在于REST API中,因此我还需要以某种方式存储计时器/计数器以停止对流中的每个元素访问远程数据库。

  2. 每次refdata更改时,使用新的广播变量重新启动Spark上下文。

  3. 将参考数据转换为RDD ,然后以我现在流式传输Pair的方式join流,尽管这将向每个对象发送参考数据。

通过@Rohan Aletty扩展答案。 这是一个BroadcastWrapper的示例代码,它根据某些ttl刷新广播变量

 public class BroadcastWrapper { private Broadcast broadcastVar; private Date lastUpdatedAt = Calendar.getInstance().getTime(); private static BroadcastWrapper obj = new BroadcastWrapper(); private BroadcastWrapper(){} public static BroadcastWrapper getInstance() { return obj; } public JavaSparkContext getSparkContext(SparkContext sc) { JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); return jsc; } public Broadcast updateAndGet(SparkContext sparkContext){ Date currentDate = Calendar.getInstance().getTime(); long diff = currentDate.getTime()-lastUpdatedAt.getTime(); if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms if (var != null) var.unpersist(); lastUpdatedAt = new Date(System.currentTimeMillis()); //Your logic to refresh ReferenceData data = getRefData(); var = getSparkContext(sparkContext).broadcast(data); } return var; } } 

您的代码如下所示:

 public void startSparkEngine() { final JavaDStream filteredStream = objectStream.transform(stream -> { Broadcast refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context()); stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField())); }); filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); } 

这对我来说也适用于多集群。 希望这可以帮助

几乎每个处理流应用程序的人都需要一种方法来编织(过滤,查找等)参考数据(从数据库,文件等)到流数据。 我们对这两部分进行了部分解决

  1. 查找要在流操作中使用的参考数据

    • 使用所需的缓存TTL创建CacheLookup对象
    • 将它包装在Broadcast中
    • 使用CacheLookup作为流逻辑的一部分

在大多数情况下,这种方法很好,除了以下内容

  1. 更新参考数据

    尽管有这些线程中的建议,但没有明确的方法实现这一点,即:杀死先前的广播变量并创建新的变量。 多个未知数,例如这些操作之间的预期。

这是一个常见的需求,如果有办法将信息发送到广播变量通知更新,它会有所帮助。 这样,就可以使“CacheLookup”中的本地缓存无效

问题的第二部分仍未解决。 如果有任何可行的方法,我会感兴趣

不确定你是否已经尝试过,但我认为可以在不关闭SparkContext情况下实现对广播变量的更新。 通过使用unpersist()方法,广播变量的副本在每个执行器上被删除,并且需要是需要重新广播的变量才能再次访问。 对于您的用例,当您想要更新广播时,您可以:

  1. 等待执行者完成当前的一系列数据

  2. 取消播放广播变量

  3. 更新广播变量

  4. 重新广播以将新的参考数据发送给执行者

我从这篇文章中得到了很多,但最后一个回复的人声称已经让它在本地工作。 重要的是要注意,您可能希望在unpersist上将阻塞设置为true ,以便您可以确保执行程序摆脱旧数据(因此在下一次迭代时不会再次读取过时值)。

最近面临这个问题。 认为它可能对scala用户有帮助..

BroadCastWrapper Scala方式如下例所示。

 import java.io.{ ObjectInputStream, ObjectOutputStream } import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.StreamingContext import scala.reflect.ClassTag /* wrapper lets us update brodcast variables within DStreams' foreachRDD without running into serialization issues */ case class BroadcastWrapper[T: ClassTag]( @transient private val ssc: StreamingContext, @transient private val _v: T) { @transient private var v = ssc.sparkContext.broadcast(_v) def update(newValue: T, blocking: Boolean = false): Unit = { v.unpersist(blocking) v = ssc.sparkContext.broadcast(newValue) } def value: T = v.value private def writeObject(out: ObjectOutputStream): Unit = { out.writeObject(v) } private def readObject(in: ObjectInputStream): Unit = { v = in.readObject().asInstanceOf[Broadcast[T]] } } 

每次需要调用update函数来获取新的广播变量。