Kafka消费者配置/性能问题

我正在尝试将kafka作为AWS SQS的替代品。 动机主要是提高性能,其中kafka将消除限制,一次性提取10条消息,上限为256kb。 这是我的用例的高级场景。 我有一堆爬虫正在发送索引文件。 有效载荷的大小平均约为1mb。 爬虫调用SOAP端点,后者又运行生产者代码以将消息提交给kafka队列。 消费者应用程序获取消息并处理它们。 对于我的测试框,我已经为主题配置了30个分区和2个复制。 两个kafka实例正在运行1个zookeeper实例。 卡夫卡版本是0.10.0。

对于我的测试,我在队列中发布了700万条消息。 我创建了一个包含30个消费者线程的消费者组,每个分区一个。 我最初的印象是,与通过SQS获得的相比,这将大大加快处理能力。 不幸的是,事实并非如此。 就我而言,数据处理很复杂,平均需要1-2分钟才能完成。这导致了一系列的分区重新平衡,因为线程无法按时心跳。 我可以在日志引用中看到一堆消息

组full_group的自动偏移提交失败:由于组已经重新平衡并将分区分配给另一个成员,因此无法完成提交。 这意味着后续调用poll()之间的时间比配置的session.timeout.ms长,这通常意味着轮询循环花费了太多时间进行消息处理。 您可以通过增加会话超时或通过max.poll.records减少poll()中返回的批量的最大大小来解决此问题。

这导致多次处理相同的消息。 我尝试使用会话超时,max.poll.records和轮询时间来避免这种情况,但这会减慢整个处理时间。 这是一些配置参数。

 metadata.max.age.ms = 300000 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] enable.auto.commit = true max.poll.records = 10000 request.timeout.ms = 310000 heartbeat.interval.ms = 100000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer group.id = full_group retry.backoff.ms = 100 fetch.max.wait.ms = 500 connections.max.idle.ms = 540000 session.timeout.ms = 300000 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer metrics.sample.window.ms = 30000 auto.offset.reset = latest 

我将消费者轮询时间减少到100毫秒。 它减少了重新平衡问题,消除了重复处理,但显着减慢了整个过程。 与使用基于SQS的解决方案的25小时相比,最终需要35小时才能完成所有600万条消息的处理。 每个消费者线程平均每次轮询检索50-60条消息,尽管其中一些有时会轮询0条记录。 当分区中有大量消息时,我不确定这种行为。 同一个线程能够在后续迭代期间获取消息。 这可能是由于重新平衡?

这是我的消费者代码

 while (true) { try{ ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { if(record.value()!=null){ TextAnalysisRequest textAnalysisObj = record.value(); if(textAnalysisObj!=null){ // Process record PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); } } } }catch(Exception ex){ LOGGER.error("Error in Full Consumer group worker", ex); } 

我理解记录处理部分是我的一个瓶颈。 但我确信这里的一些人有类似处理大处理时间的用例。 我想通过旋转其专用线程中的每个处理器或使用大容量的线程池来进行异步处理,但不确定它是否会在系统中产生很大的负载。 与此同时,我已经看到了一些人们使用暂停和恢复API来执行处理以避免重新平衡问题的实例。

在这种情况下,我真的在寻找一些建议/最佳实践。 特别是,推荐的配置设置围绕听力,请求超时,最大轮询记录,自动提交间隔,轮询间隔等,如果kafka不是我的用例的正确工具,请让我知道。

您可以在与从Kafka读取的线程不同的线程中异步处理消息。 这样自动提交将非常快,Kafka不会削减您的会话。 像这样的东西:

  private final BlockingQueue requests = new LinkedBlockingQueue(); 

在阅读post中:

 while (true) { try{ ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { if(record.value()!=null){ TextAnalysisRequest textAnalysisObj = record.value(); if(textAnalysisObj!=null){ // Process record requests.offer(textAnalysisObj); } } } } catch(Exception ex){ LOGGER.error("Error in Full Consumer group worker", ex); } 

在处理线程中:

  while (!Thread.currentThread().isInterrupted()) { try { TextAnalysisRequest textAnalysisObj = requests.take(); PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); } catch (InterruptedException e) { LOGGER.info("Process thread interrupted", e); Thread.currentThread().interrupt(); } catch (Throwable t) { LOGGER.warn("Unexpected throwable while processing.", t); } } 

另请参阅此文档,了解通过Kafka发送大量消息的策略: http : //blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq/

简而言之,它表示Kafka在大约10K的小尺寸消息上表现最佳,如果您需要发送更大的消息,最好将它们放在网络存储上,然后通过Kafka发送它们的位置,或者拆分它们。