Kafka Consumer挂在java的.hasNext
我在java中有一个简单的Kafka Consumer,代码如下
public void run() { ConsumerIterator it = m_stream.iterator(); while (it.hasNext()&& !done){ try { System.out.println("Parsing data"); byte[] data = it.next().message(); System.out.println("Found data: "+data); values.add(data); // array list } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } done = true; }
发布消息时,会成功读取数据,但是当它返回到检查it.hasNext()时,它会保持挂起状态并且永远不会返回。
什么可能拖延这个?
m_stream是一个获得如下的KafkaStream:
Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); for (final KafkaStream stream : streams) { // m_stream is one of these streams }
解决方案是添加属性
“consumer.timeout.ms”
现在,当达到超时时,抛出ConsumerTimeoutException
方法hasNext()
是阻塞的。
您可以在属性consumer.timeout.ms
更改阻止的超时
请注意,当超时到期时,它将抛出TimeoutException
。
会阅读这些关于消费者的文档: https : //cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer +实施例
- 如何在java程序中获取kafka消耗滞后
- kafka-connect错误:无法找到或加载主类
- Kafka – 如何在Producer类中获取失败的消息详细信息
- Spring引导application.yml中的Spring Kafka SSL设置
- 如何使用Java中的Structured Streaming从Kafka反序列化记录?
- Libat上的UnsatisfiedLinkError在使用Kafka Streams进行开发时会破坏DB dll
- 如何在Scala中为Kafka(带分区的commitSync)公开Java方法?
- 是否可以在Kafka 0.8.2中为现有主题添加分区
- 当kafka参与微服务架构时,如何实施合同测试?