为什么我看不到Kafka Streams reduce方法的任何输出?
给出以下代码:
KStream stream = builder.stream(Serdes.String(), customSerde, "test_in"); stream .groupByKey(Serdes.String(), customSerde) .reduce(new CustomReducer(), "reduction_state") .print(Serdes.String(), customSerde);
我在Reducer的apply方法中有一个println
语句,当我希望减少时会成功打印出来。 但是,上面显示的最终打印语句不显示任何内容。 同样,如果我使用方法而不是print
,我在目标主题中看不到任何消息。
在reduce语句之后我需要什么来查看减少的结果? 如果一个值被推送到输入,我不希望看到任何东西。 如果按下具有相同键的第二个值,我希望减少器应用(它确实如此),并且我还期望减少的结果继续到处理管道中的下一步。 如上所述,我在管道的后续步骤中没有看到任何内容,我不明白为什么。
从Kafka 0.10.1.0
所有聚合运算符都使用内部重复数据删除缓存来减少结果KTable changelog流的负载。 例如,如果您使用相同的密钥直接计数和处理两个记录,则完整的更改日志流将为
。
使用新的缓存function,缓存将接收
并存储它,但不会立即将其发送到下游。 当计算
,它将替换缓存的第一个条目。 根据缓存大小,不同密钥数,吞吐量和提交间隔,缓存会向下游发送条目。 这种情况发生在单个密钥条目的缓存逐出或缓存的完全刷新(向下游发送所有条目)。 因此,KTable更改日志可能只显示
(因为
重复删除)。
您可以通过Streams配置参数StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
来控制缓存的大小。 如果将值设置为零,则完全禁用缓存,KTable更改日志将包含所有更新(有效地提供0.10.1.0
行为)。
汇编文档包含一个更详细地解释缓存的部分:
- Kafka Log4j appender没有发送消息
- 无法读取工件描述符:IntelliJ
- 无法在jConsole中看到kafka.consumer和kafka.producer mBean
- Spark Kafka流媒体问题
- 避免apache kafka使用者中重复消息的有效策略
- kafka使用者动态检测添加的主题
- 连接到Apache Kafka多节点群集中的Zookeeper
- Kafka Consumer挂在java的.hasNext
- Apache Kafka和Avro:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer