kafka KStream – 采用n秒计数的拓扑

我有一个JSON对象流,我键入一些值的哈希值。 我希望在n秒(10?60?)间隔内按键计数,并使用这些值进行一些模式分析。

我的拓扑: K->aggregateByKey(n seconds)->process()

在这个process - init()步骤Ive调用了ProcessorContent.schedule(60 * 1000L) ,希望调用.punctuate() 。 从这里开始,我将遍历内部哈希中的值并相应地执行操作。

我看到值通过聚合步骤并命中process()函数,但永远不会调用.punctuate()


码:

 KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream opxLines = kStreamBuilder.stream(TOPIC); KStream mapped = opxLines.map(new ReMapper()); KTable<Windowed, String> ktRtDetail = mapped.aggregateByKey( new AggregateInit(), new OpxAggregate(), TimeWindows.of("opx_aggregate", 60000)); ktRtDetail.toStream().process(new ProcessorSupplier<Windowed, String>() { @Override public Processor<Windowed, String> get() { return new AggProcessor(); } }); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); 

AggregateInit()返回null。

我想我可以用简单的计时器做.punctuate()等效,但我想知道为什么这段代码没有按照我希望的方式工作。

我认为这与kafka集群的不正确设置有关。 将文件描述符计数更改为比默认值(1024 – > 65535)高得多的值后,这似乎符合规范。