Kafka Consumer挂在java的.hasNext

我在java中有一个简单的Kafka Consumer,代码如下

public void run() { ConsumerIterator it = m_stream.iterator(); while (it.hasNext()&& !done){ try { System.out.println("Parsing data"); byte[] data = it.next().message(); System.out.println("Found data: "+data); values.add(data); // array list } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } done = true; } 

发布消息时,会成功读取数据,但是当它返回到检查it.hasNext()时,它会保持挂起状态并且永远不会返回。

什么可能拖延这个?

m_stream是一个获得如下的KafkaStream:

 Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); for (final KafkaStream stream : streams) { // m_stream is one of these streams } 

解决方案是添加属性

“consumer.timeout.ms”

现在,当达到超时时,抛出ConsumerTimeoutException

方法hasNext()是阻塞的。

您可以在属性consumer.timeout.ms更改阻止的超时

请注意,当超时到期时,它将抛出TimeoutException

会阅读这些关于消费者的文档: https : //cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer +实施例