Kafka KStream – 使用带窗口的AbstractProcessor

我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储。

我期待看到.punctuate()大约每30秒调用一次。 我得到的是保存在这里 。

(原始文件长达数千行)

总结 – .punctuate()看似随机然后重复调用。 它似乎不符合通过ProcessorContext.schedule()设置的值。


编辑:

另一次运行相同的代码大约每四分钟调用一次.punctuate() 。 这次我没有看到疯狂的重复值。 来源没有变化 – 只是结果不同。

使用以下代码:

主要

 StreamsConfig streamsConfig = new StreamsConfig(config); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream lines = kStreamBuilder.stream(TOPIC); lines.process(new BPS2()); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); 

处理器

 public class BP2 extends AbstractProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class); private ProcessorContext context; private final long delay; private final ArrayList values; public BP2(long delay) { LOGGER.debug("BatchProcessor() constructor"); this.delay = delay; values = new ArrayList(); } @Override public void process(String s, String s2) { LOGGER.debug("batched processor s:{} s2:{}", s, s2); values.add(s2); } @Override public void init(ProcessorContext context) { LOGGER.info("init"); super.init(context); values.clear(); this.context = context; context.schedule(delay); } @Override public void punctuate(long timestamp) { super.punctuate(timestamp); LOGGER.info("punctuate ts: {} count: {}", timestamp, values.size()); context().commit(); } } 

ProcessorSupplier

 public class BPS2 implements ProcessorSupplier { private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class); @Override public Processor get() { try { return new BP2(30000); } catch(Exception exception) { LOGGER.error("Unable to instantiate BatchProcessor()", exception); throw new RuntimeException(); } } } 

编辑:

为了确保我的调试器没有减慢速度,我构建它并在与我的kafka进程相同的盒子上运行它。 这次它甚至没有试图延迟4分钟或更长时间 – 在几秒钟内就输出了对.punctuate()虚假调用。 其中许多(大多数)没有干预调用.process()

更新:这部分答案是针对Kafka版本0.11或更早版本(对于Kafka 1.0及更高版本,见下文)

在Kafka Streams中,标点符号基于流时间 而非 系统时间 (也称为处理时间)。

每个默认流时间事件时间 ,即嵌入在Kafka记录中的时间戳。 由于您没有设置非默认的TimestampExtractor (请参阅http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters中的 timestamp.extractor ),对punctuate的调用仅取决于关于您处理的记录的事件时间的过程。 因此,如果您需要多个分钟来处理记录的“30秒”(事件时间),则会在不到30秒(挂钟时间)的情况下调用punctuate

这也可以解释您的不规则呼叫模式(即突发和长延迟)。 如果您的数据事件时间“跳转”,并且您的主题中已经完全可以处理您要处理的数据,那么Kafka Streams也会“跳转”内部维护的流时间

我认为,您可以使用WallclockTimestampExtractor解决您的问题(请参阅http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor )

还有一点需要提及: 流时间仅在处理数据才会提前 – 如果您的应用程序到达输入主题的末尾并等待数据,则不会调用punctuate 。 即使您使用WallclockTimestampExtractor这也适用。

顺便说一句:目前有关于Streams标点符号行为的讨论: https : //github.com/apache/kafka/pull/1689

回答Kafka 1.0及更高版本

从Kafka 1.0开始,可以根据挂钟时间或事件时间来注册标点符号: https : //kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2

刚读完这个问题的答案,我认为这也是你的答案。 它的要点是:

  1. 流消费者对记录执行轮询
  2. 完全处理所有返回的记录。
  3. 然后使用配置的延迟安排标点回调。

指出标点符号不是固定的时间间隔事件,并且#2采用的时间的变化将导致标点符号的执行周期的等效变化。

….但是读了那个链接,他说它比我好。

好的 – 我认为这是卡夫卡的一个错误。

原因如下:

在我最初的测试中,我使用一台机器来运行ProducerConsumer 。 我会运行Producer几分钟来生成一些测试数据,然后运行我的测试。 这将给出我最初发布的奇怪输出。

然后我决定将Producer推到后台并让它继续运行 。 现在我看到.punctuate()调用之间100%完美的30秒间隔。 没有更多的问题。

换句话说 – 如果kafka服务器没有处理任何入站数据,那么它似乎与运行KStreams进程不一致