卡夫卡模式订阅。 新主题没有触发重新平衡

根据关于kafka javadocs的文档,如果我:

  • 订阅模式
  • 创建与模式匹配的主题

应该发生重新平衡,这使得消费者从该新主题中读取。 但那并没有发生。

如果我停止并启动消费者,它确实会选择新主题。 所以我知道新主题与模式匹配。 在https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics中可能存在此问题的重复,但这个问题无处可去。

我看到kafka日志并没有错误,它只是不会触发重新平衡。 当消费者加入或死亡时触发重新平衡,但是在创建新主题时不会触发(即使将分区添加到现有主题,但这是另一个主题)。

我正在使用kafka 0.10.0.0和“新消费者API”的官方Java客户端,意思是代理GroupCoordinator而不是胖客户端+ zookeeper。

这是示例消费者的代码:

public class SampleConsumer { public static void main(String[] args) throws IOException { KafkaConsumer consumer; try (InputStream props = Resources.getResource("consumer.props").openStream()) { Properties properties = new Properties(); properties.load(props); properties.setProperty("group.id", "my-group"); System.out.println(properties.get("group.id")); consumer = new KafkaConsumer(properties); } Pattern pattern = Pattern.compile("mytopic.+"); consumer.subscribe(pattern, new SampleRebalanceListener()); while (true) { ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { System.out.printf("%s %s\n", record.topic(), record.value()); } } } 

}

在制作人中,我正在向名为mytopic1,mytopic2等的主题发送消息。

如果不触发重新平衡,模式几乎没用。

你知道为什么没有发生重新平衡吗?

文档提到“模式匹配将定期针对检查时存在的主题进行。” 事实certificate,“周期性”对应于metadata.max.age.ms属性。 通过将该属性(在我的代码示例中的“consumer.props”内)设置为5000,我可以看到它每5秒检测一次新主题和分区。

这是根据设计,根据这个jira票https://issues.apache.org/jira/browse/KAFKA-3854 :

关于JIRA的最后一点说明,在创建之后,不会将消费者订阅模式的后续创建主题分配给消费者,这似乎与设计一致。 处理该情况需要对相同模式重复subscribe()。

刷新元数据轮询执行故障单中提到的“重复订阅()”。

这是令人困惑的来自Kafka 0.8,其中真正的触发基于zookeper手表,而不是轮询。 对于这种情况,IMO 0.9更多的是降级,而不是“及时”重新平衡,这变成了带有开销的高频率轮询,或者在对新主题/分区作出反应之前很长时间的低频率轮询。