Kafka KStreams – 处理超时

我试图使用带有TimeWindows.of("name", 30000) .process()批量处理一些KTable值并发送它们。 似乎30秒超过了消费者超时间隔,之后Kafka认为该消费者已经解散并释放分区。

我已经尝试提高轮询频率和提交间隔以避免这种情况:

 config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000"); config.put(StreamsConfig.POLL_MS_CONFIG, "5000"); 

不幸的是,这些错误仍在发生:

(很多这些)

 ERROR oakspinternals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0 

其次是:

 INFO oakcciAbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1 WARN oakspinternals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) 

显然,我需要更频繁地将心跳发送回服务器。 怎么样?

我的拓扑结构是:

 KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream lines = kStreamBuilder.stream(TOPIC); KTable<Windowed, String> kt = lines.aggregateByKey( new DBAggregateInit(), new DBAggregate(), TimeWindows.of("write_aggregate2", 30000)); DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier(); kt.toStream().process(dbProcessorSupplier); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); 

KTable每隔30秒按键分组值。 在Processor.init()中,我调用context.schedule(30000)

DBProcessorSupplier提供DBProcessor的实例。 这是AbstractProcessor的一个实现,其中提供了所有覆盖。 他们所做的只是LOG,所以我知道每个人都被击中。

这是一个非常简单的拓扑结构,但很明显我在某个地方错过了一个步骤。


编辑:

我知道我可以在服务器端对此进行调整,但我希望有一个客户端解决方案。 我喜欢在客户端退出/死亡时很快就可以使用分区的概念。


编辑:

为了简化问题,我从图中删除了聚合步骤。 它现在只是消费者 – >处理器()。 (如果我将消费者直接发送到.print()它很快就会工作,所以我知道没关系)。 (类似地,如果我通过.print()输出聚合(KTable),它似乎也可以。)

我发现.punctuate() .process()应该每隔30秒调用.punctuate()实际上是阻塞可变长度的时间并且随机输出(如果有的话)。

  • 主程序
  • 调试输出
  • 处理器供应商
  • 处理器

进一步:

我将调试级别设置为’debug’并重新启动。 我看到很多消息:

 DEBUG oakspinternals.StreamTask - Start processing one record [ConsumerRecord  

但是.punctuate()函数中的断点没有被击中。 所以它做了很多工作,但没有让我有机会使用它。

一些澄清:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG是提交间隔的下限,即在提交之后,下一次提交不会在此时间之前发生。 基本上,Kafka Stream试图在这段时间过后尽快提交,但无法保证下一次提交实际需要多长时间。
  • StreamsConfig.POLL_MS_CONFIG用于内部KafkaConsumer#poll()调用,以指定KafkaConsumer#poll()调用的最大阻塞时间。

因此,这两个值对心跳更有帮助。

Kafka Streams在处理记录时遵循“深度优先”策略。 这意味着,在每个记录的poll()之后,将执行拓扑的所有运算符。 假设你有三个连续的地图,那么在下一个/第二个记录被处理之前,将为第一个记录调用所有三个地图。

因此,在第一次poll()所有记录被完全处理之后,将进行下一次poll()调用。 如果你想更频繁地心跳,你需要确保一个poll()调用获取更少的记录,这样处理所有记录所需的时间更少,并且下一个poll()将被提前触发。

您可以使用可通过StreamsConfig指定的StreamsConfig配置参数来完成此操作(请参阅https://kafka.apache.org/documentation.html#consumerconfigs ):

streamConfig.put(ConsumerConfig.XXX,VALUE);

  • max.poll.records :如果减小此值,将轮询较少的记录
  • session.timeout.ms :如果增加此值,则有更多时间处理数据(为了完整性而添加此项,因为它实际上是客户端设置而不是服务器/代理端配置 – 即使您知道此解决方案并且不喜欢 :))

编辑

从Kafka 0.10.1 ,可以(并推荐)在流配置中为消费者和procuder配置添加前缀。 这避免了参数冲突,因为一些参数名称用于消费者和生产者,否则无法区分(并且将同时应用于消费者生产者)。 要为参数添加前缀,可以分​​别使用StreamsConfig#consumerPrefix()StreamsConfig#producerPrefix() 。 例如: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

还有一件事要提到:这个问题中描述的场景是一个已知问题,并且已经有KIP-62为KafkaConsumer引入了一个发送心跳的后台线程,从而将heartbeats与poll()调用分离。 Kafka Streams将在即将发布的版本中利用这一新function。