连接到Apache Kafka多节点群集中的Zookeeper

我按照以下说明设置了多节点kafka群集。 现在,如何连接到zookeeper? 是否可以从JAVA中的生产者/消费者端连接到一个zookeeper,或者有没有办法连接所有zookeeper节点?

设置多节点Apache ZooKeeper集群

在群集的每个节点上,将以下行添加到文件kafka / config / zookeeper.properties

server.1=zNode01:2888:3888 server.2=zNode02:2888:3888 server.3=zNode03:2888:3888 #add here more servers if you want initLimit=5 syncLimit=2 

在群集的每个节点上,在dataDir属性表示的文件夹中创建名为myid的文件(默认情况下,文件夹为/ tmp / zookeeper)。 myid文件应该只包含znode的id(zNode01为’1’,ZNode02为’2’等等)

设置多代理Apache Kafka集群

在集群的每个节点上,修改文件kafka / config / server.properties中的属性zookeeper.connect:

  zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181 

在集群的每个节点上,从文件kafka / config / server.properties修改属性host.name:host.name = zNode0x

在集群的每个节点上,从文件kafka / config / server.properties修改属性broker.id(集群中的每个代理都应具有唯一的ID)

您可以传递生产者或使用者中的所有节点。 Kafka足够智能,它将根据复制因子或分区连接到具有您所需数据的节点

这是消费者代码:

 Properties props = new Properties(); props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } 

你可以在这里找到更多信息

注意 :这个approch的问题是它将打开多个连接以找出哪个节点保存数据。 对于更强大和可扩展的系统,您可以维护分区号和节点名称的映射,这也有助于加载Balencing。

这是生产者样本

 Properties props = new Properties(); props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i))); producer.close(); 

更多信息在这里

无需在Kafka客户端(生产者和消费者)中传递Zookeeper连接属性。

从Kafka-v9及以上版本开始,Kafka Producer和Consumer不与Zookeeper通信。