Spark流mapWithState超时延迟了吗?

我期望Spark 1.6+的新mapWithState API几乎立即删除超时的对象,但是有一个延迟。

我正在使用下面的JavaStatefulNetworkWordCount的改编版本测试API:

SparkConf sparkConf = new SparkConf() .setAppName("JavaStatefulNetworkWordCount") .setMaster("local[*]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); ssc.checkpoint("./tmp"); StateSpec<String, Integer, Integer, Tuple2> mappingFunc = StateSpec.function((word, one, state) -> { if (state.isTimingOut()) { System.out.println("Timing out the word: " + word); return new Tuple2(word, state.get()); } else { int sum = one.or(0) + (state.exists() ? state.get() : 0); Tuple2 output = new Tuple2(word, sum); state.update(sum); return output; } }); JavaMapWithStateDStream<String, Integer, Integer, Tuple2> stateDstream = ssc.socketTextStream(args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2) .flatMap(x -> Arrays.asList(SPACE.split(x))) .mapToPair(w -> new Tuple2(w, 1)) .mapWithState(mappingFunc.timeout(Durations.seconds(5))); stateDstream.stateSnapshots().print(); 

和nc一起使用( nc -l -p

当我在nc窗口中输入一个单词时,我看到每秒都会在控制台中打印一个元组。 但是看起来好像超时消息在5s之后打印出来,正如预期的那样根据超时设置。 元组到期所需的时间似乎在5到20之间变化。

我错过了一些配置选项,或者超时可能只与检查点同时执行?

一旦事件超时,它不会立即删除,但只会通过将其保存到’deltaMap’来标记为删除:

 override def remove(key: K): Unit = { val stateInfo = deltaMap(key) if (stateInfo != null) { stateInfo.markDeleted() } else { val newInfo = new StateInfo[S](deleted = true) deltaMap.update(key, newInfo) } } 

然后,收集超时事件并仅在检查点将其发送到输出流。 也就是说:在批处理t超时的事件将仅在下一个检查点出现在输出流中 – 默认情况下,平均为5个批处理间隔后,即批处理t + 5:

  override def checkpoint(): Unit = { super.checkpoint() doFullScan = true } ... removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled ... // Get the timed out state records, call the mapping function on each and collect the // data returned if (removeTimedoutData && timeoutThresholdTime.isDefined) { ... 

实际上只有在有足够数量的元素时才会删除元素,并且正在序列化状态映射时 – 目前只在检查点发生:

  /** Whether the delta chain length is long enough that it should be compacted */ def shouldCompact: Boolean = { deltaChainLength >= deltaChainThreshold } // Write the data in the parent state map while copying the data into a new parent map for // compaction (if needed) val doCompaction = shouldCompact ... 

默认情况下,检查点每10次迭代发生一次,因此在上面的示例中每10秒; 由于你的超时是5秒,事件预计在5-15秒内。

编辑:修正并详细阐述了@YuvalItzchakov的评论

我是否缺少一些配置选项,或者超时可能只与快照同时执行?

每次调用mapWithState (使用您的配置,大约每1秒), MapWithStateRDD将在内部检查过期记录并将其计时。 你可以在代码中看到它:

 // Get the timed out state records, call the mapping function on each and collect the // data returned if (removeTimedoutData && timeoutThresholdTime.isDefined) { newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) => wrappedState.wrapTimingOutState(state) val returned = mappingFunction(batchTime, key, None, wrappedState) mappedData ++= returned newStateMap.remove(key) } } 

(除了执行每个作业所花费的时间之外,事实certificatenewStateMap.remove(key)实际上只标记要删除的文件。有关更多信息,请参阅“编辑”。)

您必须考虑每个阶段计划所需的时间,以及每个执行此阶段实际轮到并运行所需的时间。 这是不准确的,因为它作为一个分布式系统运行,其他因素可以发挥作用,使你的超时比你预期的更准确/更准确。


编辑

正如@etov正确指出的那样, newStateMap.remove(key)实际上并没有从OpenHashMapBasedStateMap[K, S]删除该元素,而只是将其标记为删除。 这也是您看到到期时间加起来的原因。

实际相关的代码片段在这里:

 // Write the data in the parent state map while // copying the data into a new parent map for compaction (if needed) val doCompaction = shouldCompact val newParentSessionStore = if (doCompaction) { val initCapacity = if (approxSize > 0) approxSize else 64 new OpenHashMapBasedStateMap[K, S](initialCapacity = initCapacity, deltaChainThreshold) } else { null } val iterOfActiveSessions = parentStateMap.getAll() var parentSessionCount = 0 // First write the approximate size of the data to be written, so that readObject can // allocate appropriately sized OpenHashMap. outputStream.writeInt(approxSize) while(iterOfActiveSessions.hasNext) { parentSessionCount += 1 val (key, state, updateTime) = iterOfActiveSessions.next() outputStream.writeObject(key) outputStream.writeObject(state) outputStream.writeLong(updateTime) if (doCompaction) { newParentSessionStore.deltaMap.update( key, StateInfo(state, updateTime, deleted = false)) } } // Write the final limit marking object with the correct count of records written. val limiterObj = new LimitMarker(parentSessionCount) outputStream.writeObject(limiterObj) if (doCompaction) { parentStateMap = newParentSessionStore } 

如果应该压缩deltaMap (用doCompaction变量标记),那么(并且只有那时)是从所有已删除的实例中清除的映射。 这种情况多久发生一次? 一个delta超过threadshold:

 val DELTA_CHAIN_LENGTH_THRESHOLD = 20 

这意味着delta链超过20个项目,并且有些项目已被标记为删除。