如何在Kafka中使用多个消费者?

我是一名学习卡夫卡的新学生,我遇到了一些基本问题,理解多个消费者,文章,文档等对目前来说都没有太大的帮助。

我试图做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,向主题发布100条简单消息并让我的消费者检索它们。 我已经成功地做到了这一点,但是当我尝试引入第二个消费者来消费刚刚发布消息的同一主题时,它不会收到任何消息。

我的理解是,对于每个主题,您可以拥有来自不同消费者群体的消费者,并且每个消费者群体都可以获得针对某个主题生成的消息的完整副本。 它是否正确? 如果没有,那么建立多个消费者的正确方法是什么? 这是我到目前为止写的消费者类:

public class AlternateConsumer extends Thread { private final KafkaConsumer consumer; private final String topic; private final Boolean isAsync = false; public AlternateConsumer(String topic, String consumerGroup) { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("group.id", consumerGroup); properties.put("partition.assignment.strategy", "roundrobin"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer(properties); consumer.subscribe(topic); this.topic = topic; } public void run() { while (true) { ConsumerRecords records = consumer.poll(0); for (ConsumerRecord record : records) { System.out.println("We received message: " + record.value() + " from topic: " + record.topic()); } } } } 

此外,我注意到最初我只使用一个分区测试上述消耗的主题’test’。 当我将另一个消费者添加到现有的消费者群体中时说’testGroup’时,这触发了Kafka重新平衡,这使得我的消费延迟减少了很多,在几秒钟内。 我认为这是重新平衡的一个问题,因为我只有一个分区,但是当我创建一个新的主题“多个分区”,比如6个分区时,出现了类似的问题,即向同一个消费者群体添加更多的消费者会导致延迟问题。 我环顾四周,人们告诉我,我应该使用multithreading消费者 – 任何人都能明白这一点吗?

我认为你的问题在于auto.offset.reset属性。 当新的使用者从分区读取并且没有先前提交的偏移量时,auto.offset.reset属性用于决定起始偏移量应该是什么。 如果将其设置为“最大”(默认值),则会开始读取最新(最后)消息。 如果将其设置为“最小”,则会获得第一个可用消息。

所以添加:

 properties.put("auto.offset.reset", "smallest"); 

然后再试一次。

在这里的文档中它说:“如果你提供的线程多于主题上的分区,一些线程永远不会看到消息”。 你能为主题添加分区吗? 我的消费者组线程数等于我的主题中的分区数,每个线程都在获取消息。

这是我的主题配置:

 buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins Topic:recent-wins PartitionCount:3 ReplicationFactor:1 Configs: Topic: recent-wins Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: recent-wins Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: recent-wins Partition: 2 Leader: 0 Replicas: 0 Isr: 0 

而我的消费者:

 package com.cie.dispatcher.services; import com.cie.dispatcher.model.WinNotification; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import io.dropwizard.lifecycle.Managed; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * This will create three threads, assign them to a "group" and listen for notifications on a topic. * Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by * the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the * lifecycle manager in dropwizard. * 

* Created by aakture on 6/15/15. */ public class KafkaTopicListener implements Managed { private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class); private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; private int threadCount; private WinNotificationWorkflow winNotificationWorkflow; private ObjectMapper objectMapper; @Inject public KafkaTopicListener(String a_zookeeper, String a_groupId, String a_topic, int threadCount, WinNotificationWorkflow winNotificationWorkflow, ObjectMapper objectMapper) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; this.threadCount = threadCount; this.winNotificationWorkflow = winNotificationWorkflow; this.objectMapper = objectMapper; } /** * Creates the config for a connection * * @param zookeeper the host:port for zookeeper, "localhost:2181" for example. * @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads. * @return the config props */ private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void stop() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { LOG.info("Interrupted during shutdown, exiting uncleanly"); } LOG.info("{} shutdown successfully", this.getClass().getName()); } /** * Starts the listener */ public void start() { Map topicCountMap = new HashMap<>(); topicCountMap.put(topic, new Integer(threadCount)); Map>> consumerMap = consumer.createMessageStreams(topicCountMap); List> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(threadCount); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ListenerThread(stream, threadNumber)); threadNumber++; } } private class ListenerThread implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ListenerThread(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { try { String message = null; LOG.info("started listener thread: {}", m_threadNumber); ConsumerIterator it = m_stream.iterator(); while (it.hasNext()) { try { message = new String(it.next().message()); LOG.info("receive message by " + m_threadNumber + " : " + message); WinNotification winNotification = objectMapper.readValue(message, WinNotification.class); winNotificationWorkflow.process(winNotification); } catch (Exception ex) { LOG.error("error processing queue for message: " + message, ex); } } LOG.info("Shutting down listener thread: " + m_threadNumber); } catch (Exception ex) { LOG.error("error:", ex); } } } }

如果您希望多个消费者使用相同的消息(如广播),您可以使用不同的使用者组生成它们,并在使用者配置中将auto.offset.reset设置为最小。 如果您希望多个消费者并行完成消费(在他们之间划分工作),您应该创建多个分区> =消费者数量。 一个分区最多只能由一个消费者进程使用。 但是一个消费者可以使用多个分区。