Storm KafkaSpout停止使用来自Kafka Topic的消息

我的问题是Storm KafkaSpout在一段时间后停止使用来自Kafka主题的消息。 在storm中启用调试时,我得到如下日志文件:

2016-07-05 03:58:26.097 oasdtask [INFO] Emitting:packet_spout __metrics [#object [org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo 0x2c35b34f“org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo @ 2c35b34f“] [#object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x798f1e35”[__ack-count = {default = 0}]“] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x230867ec“[__sendqueue = {sojourn_time_ms = 0.0,write_pos = 5411461,read_pos = 5411461,overflow = 0,arrival_rate_secs = 0.0,capacity = 1024,population = 0}]”] #object [org.apache.storm.metric。 api.IMetricsConsumer $ DataPoint 0x7cdec8eb“[__ complete-latency = {default = 0.0}]”] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x658fc59“[__skipped-max-spout = 0]”]# object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x3c1f3a50“[__receive = {sojourn_time_ms = 4790.5,write_pos = 2468305,read_pos = 2468304,overflow = 0,arrival_rate_secs = 0.20874647740319383,capacity = 1024,population = 1}] “] #object [org.apac he.storm.metric.api.IMetricsConsumer $ DataPoint 0x262d7906“[__skipped-inactive = 0]”] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x73648c7e“[kafkaPartition = {Partition {host = slave103: 9092,topic = packet,partition = 12} / fetchAPICallCount = 0,Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPILatencyMax = null,Partition {host = slave103:9092,topic = packet,partition = 12} / lostMessageCount = 0,Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPILatencyMean = null,Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPIMessageCount = 0}] “] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x4e43df61”[kafkaOffset = {packet / totalLatestCompletedOffset = 154305947,packet / partition_12 / spoutLag = 82472754,packet / totalEarliestTimeOffset = 233919465,packet / partition_12 / earliestTimeOffset = 233919465,packet / partition_12 / latestEmittedOffset = 154307691,packet / partition_12 / latestTimeOffset = 236778701,packet / totalLatestEmittedOffset = 154307691,pa cket / partition_12 / latestCompletedOffset = 154305947,packet / totalLatestTimeOffset = 236778701,packet / totalSpoutLag = 82472754}]“] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x49fe816b”[__transfer-count = {__ack_init = 0 ,default = 0,__ metric = 0}]“] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x63e2bdc0”[__fail-count = {}]“] #object [org.apache.storm.metric .api.IMetricsConsumer $ DataPoint 0x3b17bb7b“[__ skipped-throttle = 1086120]”] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x1315a68c“[__emit-count = {__ bag_init = 0,default = 0,__ metrictrics = 0}]“]]]

2016-07-05 03:58:55.042 oasdexecutor [INFO] 处理收到的消息FOR -2 TUPLE :source:__system:-1,stream:__tick,id:{},[30]

2016-07-05 03:59:25.042 oasdexecutor [INFO] 处理收到的消息FOR -2 TUPLE :source:__system:-1,stream:__tick,id:{},[30]

2016-07-05 03:59:25.946 oasdexecutor [INFO] 处理收到的消息FOR -2 TUPLE :source:__system:-1,stream:__ metrics_tick,id:{},[60]

我的测试拓扑非常简单,One KafkaSpout和另一个Counter Bolt。 当拓扑工作正常时, FORTUPLE之间的值是正数; 当拓扑停止使用消息时,该值变为负数。 所以我很好奇是什么导致处理收到消息FOR -2 TUPLE的问题 ,以及如何解决这个问题?

顺便说一句,我的实验环境是:

操作系统:Red Hat Enterprise Linux Server 7.0版(Maipo)
卡夫卡:0.10.0.0
风暴:1.0.1

在stom邮件列表的帮助下,我能够调整KafkaSpout并解决问题。 以下设置适用于我。

config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2048); config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false); config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); 

我通过发送20k-50k批次进行测试,在突发之间暂停1秒。 每条消息是2048字节。

我正在运行3节点集群,我的拓扑有4个spout,主题有64个分区。

在200M消息后它仍在工作….

  1. 检查生产者是否实际上正在写入您期望的主题。
  2. 确保喷口可以在网络级别到达Kafka。 您可以使用Telnet命令进行检查。
  3. 鲸鱼喷水可以到达Zookeeper吗? 使用Telnet再次检查。

资料来源: KafkaSpout没有收到任何卡夫卡的消息

如果以上三个是真的,那么:

Kafka有固定的主题保留窗口。 如果保留已满,则会从尾部丢弃消息。

所以这里可能会发生什么:将数据推送到kafka的速度比消费者消费消息的速度要快。

来源: Storm-kafka鲸鱼喷水不足以处理信息