卡夫卡:不能创建多个流消费者

我刚刚起步并运行Kafka 0.8 beta 1.我有一个非常简单的示例启动和运行,问题是,我只能让一个消息消费者工作,而不是几个。 也就是说,runSingleWorker()方法工作。 run()方法不起作用:

import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.consumer.ConsumerConfig; import kafka.javaapi.consumer.ConsumerConnector; import java.util.Map; import java.util.List; import java.util.HashMap; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import com.truecar.inventory.worker.core.application.config.AppConfig; public class ConsumerThreadPool { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); public ConsumerThreadPool(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig")); this.topic = topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(Integer numThreads) { Map topicCountMap = new HashMap(); topicCountMap.put(topic, numThreads); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream> topicListeners = consumerMap.get(topic); executor = Executors.newFixedThreadPool(numThreads); for(Integer i = 0; i < numThreads; i++ ){ KafkaStream stream = topicListeners.get(i); executor.submit(new Consumer(stream, i)); } } public void runSingleWorker(Integer numThreads) { Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream stream = consumerMap.get(topic).get(0); ConsumerIterator it = stream.iterator(); while(true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } while(it.hasNext()){ System.out.println(new String(it.next().message())); } } } } 

在我的玩具消费者里面:

 import kafka.consumer.KafkaStream; import kafka.consumer.ConsumerIterator; public class Consumer implements Runnable { private KafkaStream kafkaStream; private Integer threadNumber; public Consumer(KafkaStream kafkaStream, Integer threadNumber) { this.threadNumber = threadNumber; this.kafkaStream = kafkaStream; } public void run() { ConsumerIterator it = kafkaStream.iterator(); System.out.println("Created iterator " + it.toString() + " thread number " + threadNumber); while(true) { try { Thread.sleep(1000); } catch (InterruptedException e) { break; } while(it.hasNext()) { System.out.println("Thread " + threadNumber + ": " + new String(it.next().message())); } } System.out.println("Shutting down Thread: " + threadNumber); } } 

问题是,工人池没有收到消息:

 Created iterator empty iterator thread number 3 Created iterator empty iterator thread number 6 Created iterator empty iterator thread number 9 Created iterator empty iterator thread number 7 Created iterator empty iterator thread number 0 Created iterator empty iterator thread number 0 Created iterator empty iterator thread number 8 Created iterator empty iterator thread number 3 etc... 

当我通过产品命令行添加消息时,消息将在单线程工作者版本下打印,但在多流情况下不会打印消息。 这里发生了什么? 我怎样才能解决这个问题?

顺便说一下,kafka 0.8的pom.xml不是一个有效的pom,也不会获取依赖关系,所以这里有一个完全依赖的pom。

   4.0.0 group1 artifact1 0.1.0 jar  UTF-8 3.2.4.RELEASE    org.springframework spring-core 3.2.4.RELEASE   org.springframework spring-context 3.2.4.RELEASE   org.apache.kafka kafka_2.9.2 0.8.0-beta1   javax.inject javax.inject 1   org.scala-lang scala-library 2.9.2   log4j log4j 1.2.17   com.101tec zkclient 0.3   com.yammer.metrics metrics-core 2.2.0    inventory-core   org.apache.maven.plugins maven-compiler-plugin 3.0  1.7 1.7    org.apache.maven.plugins maven-jar-plugin    com.truecar.inventory.worker.core.application.Starter      org.dstovall onejar-maven-plugin 1.4.4    0.97 onejar   one-jar         onejar-maven-plugin.googlecode.com http://onejar-maven-plugin.googlecode.com/svn/mavenrepo    

提问者可能为时已晚,但对其他开发人员可能有用。 似乎你只为一对消费者使用了一个分区 – 这是错误的。 从文档引用:

由于有许多分区,这仍然可以平衡许多消费者实例的负载。 但请注意, 除了分区之外,不能有更多的消费者实例

因此,当您考虑消费者时,您应该考虑如何按分区划分消息。 在大多数情况下,你应该对它使用一些高级分组,或者甚至默认让它是随机的。