如何从Java中获取kafka服务器的主题列表

我正在使用kafka 0.8版本,非常新的。

我想知道在kafka server创建的主题列表及其元数据。 有没有API可以找到这个?

基本上,我需要编写一个Java消费者,它应该自动发现kafka server任何主题。有API来获取TopicMetadata ,但这需要topic的名称作为输入参数。我需要服务器中存在的所有主题的信息。

一个好的起点是Kafka附带的示例shell脚本。 在发行版的/ bin目录中,您可以使用一些shell脚本,其中一个是./kafka-topic-list.sh如果在未指定主题的情况下运行该脚本,它将返回包含其元数据的所有主题。 请参阅: https : //github.com/apache/kafka/blob/0.8/bin/kafka-list-topic.sh

该shell脚本依次运行: https : //github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala

以上都是对0.8 Kafka版本的引用,所以如果你使用的是不同版本(甚至是点差),请务必在github上使用相应的分支/标签

与Kafka 0.9.0

您可以使用提供的使用者方法listTopics()列出服务器中的主题;

例如。

 Map > topics; Properties props = new Properties(); props.put("bootstrap.servers", "1.2.3.4:9092"); props.put("group.id", "test-consumer-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); topics = consumer.listTopics(); consumer.close(); 

我认为这是最好的方法:

 ZkClient zkClient = new ZkClient("zkHost:zkPort"); List topics = JavaConversions.asJavaList(ZkUtils.getAllTopics(zkClient)); 

如果你想从Zookeeper中提取代理或其他kafka信息,那么kafka.utils.ZkUtils提供了一个很好的界面。 这是我要列出所有zookeeper经纪人的代码(那里有很多其他方法):

 List listBrokers() { final ZkConnection zkConnection = new ZkConnection(connectionString); final int sessionTimeoutMs = 10 * 1000; final int connectionTimeoutMs = 20 * 1000; final ZkClient zkClient = new ZkClient(connectionString, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); final ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()); } 

使用Scala:

 import java.util.{Properties} import org.apache.kafka.clients.consumer.KafkaConsumer object KafkaTest { def main(args: Array[String]): Unit = { val brokers = args(0) val props = new Properties(); props.put("bootstrap.servers", brokers); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); val consumer = new KafkaConsumer[String, String](props); val topics = consumer.listTopics().keySet(); println(topics) } } 

您可以使用zookeeper API获取代理列表,如下所述:

  ZooKeeper zk = new ZooKeeper("zookeeperhost, 10000, null); List ids = zk.getChildren("/brokers/ids", false); List brokerList = new ArrayList<>(); ObjectMapper objectMapper = new ObjectMapper(); for (String id : ids) { Map map = objectMapper.readValue(zk.getData("/brokers/ids/" + id, false, null), Map.class); brokerList.add(map); } 

使用此代理列表可以使用以下链接获取所有主题

https://cwiki.apache.org/confluence/display/KAFKA/Finding+Topic+and+Partition+Leader