kafka使用者动态检测添加的主题

我正在使用KafkaConsumer来消费来自Kafka服务器(主题)的消息。

  • 它适用于在启动消费者代码之前创建的主题…

但问题是,如果动态创建的主题(我的意思是说消费者代码开始之后),它将无法工作,但API表示它将支持动态主题创建..这是您的参考链接..

使用的Kafka版本:0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

这是JAVA代码……

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); Pattern r = Pattern.compile("siddu(\\d)*"); consumer.subscribe(r, new HandleRebalance()); try { while(true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord> partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) { System.out.println(partition.partition() + ": " +record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); } 

注意:我的主题名称与正则表达式匹配..如果我重新启动消费者,那么它将开始阅读推送到主题的消息…

任何帮助真的很感激……

在apache kafka邮件档案中有一个答案。 我正在复制它:

使用者支持配置选项“metadata.max.age.ms”,它基本上控制提取主题元数据的频率。 默认情况下,此设置相当高(5分钟),这意味着最多需要5分钟才能发现与正则表达式匹配的新主题。 您可以将此值设置得更低,以便更快地发现主题。

所以在你的道具中你可以:

 props.put("metadata.max.age.ms", 5000); 

这将使您的消费者每5秒钟了解一下新主题。

你可以挂进Zookeeper。 查看示例代码 。 实质上,您将在Zookeeper节点/brokers/topics上创建一个观察程序。 当在这里添加新的孩子时,这是一个新的主题被添加,你的观察者将被触发。