Kafka – 使用高级消费者实现延迟队列

想要使用高级消费者api实现延迟消费者

大意:

  • 按密钥生成消息(每个消息包含创建时间戳)这可确保每个分区按生产时间排序消息。
  • auto.commit.enable = false(将在每个消息进程后显式提交)
  • 消费一条消息
  • 检查消息时间戳并检查是否已经过了足够的时间
  • 进程消息(此操作永远不会失败)
  • 提交1个偏移量

    while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something.... } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail the consumer consumer.commitOffsets //commit each msg } 

对此实施的一些担忧:

  1. 提交每个偏移可能会减慢ZK
  2. consumer.commitOffsets会抛出exception吗? 如果是的话,我将两次使用相同的消息(可以用幂等消息解决)
  3. 问题等待很长时间没有提交偏移量,例如延迟时间是24小时,将从迭代器获得下一个,睡眠24小时,进程和提交(ZK会话超时?)
  4. ZK会话如何保持活跃而不提交新的偏移量? (设置一个配置单元zookeeper.session.timeout.ms可以解决死亡的消费者而不认识它)
  5. 我遗失的任何其他问题?

谢谢!

解决此问题的一种方法是使用不同的主题来推送所有要延迟的邮件。 如果所有延迟的消息都应该在相同的时间延迟之后处理,那么这将非常简单:

 while(it.hasNext()) { val message = it.next().message() if(shouldBeDelayed(message)) { val delay = 24 hours val delayTo = getCurrentTime() + delay putMessageOnDelayedQueue(message, delay, delayTo) } else { process(message) } consumer.commitOffset() } 

现在将尽快处理所有常规消息,而那些需要延迟的消息将被放在另一个主题上。

好消息是我们知道延迟主题头部的消息是应该首先处理的消息,因为它的delayTo值将是最小的。 因此,我们可以设置另一个读取头消息的使用者,检查时间戳是否在过去,如果是,则处理消息并提交偏移量。 如果不是它不提交偏移量而是直到那个时间才睡觉:

 while(it.hasNext()) { val delayedMessage = it.peek().message() if(delayedMessage.delayTo < getCurrentTime()) { val readMessage = it.next().message process(readMessage.originalMessage) consumer.commitOffset() } else { delayProcessingUntil(delayedMessage.delayTo) } } 

如果有不同的延迟时间,您可以在延迟时划分主题(例如24小时,12小时,6小时)。 如果延迟时间比动态更加动态则变得有点复杂。 您可以通过引入两个延迟主题来解决它。 读取延迟主题A中的所有消息并处理其delayTo值过去的所有消息。 在其他人中,您只需找到最接近delayTo ,然后将它们放在主题B 。 睡觉直到应该处理最接近的一个,并反过来完成所有操作,即处理来自主题B消息,并将一次不应该被回溯到主题A

回答您的具体问题(有些问题已在您的问题的评论中得到解决)

  1. 提交每个偏移可能会减慢ZK

您可以考虑切换到在Kafka中存储偏移量(可从0.8.2获得的function,在消费者配置中查看offsets.storage属性)

  1. consumer.commitOffsets会抛出exception吗? 如果是的话,我将两次使用相同的消息(可以用幂等消息解决)

我相信如果它不能与偏移存储器通信,例如。 正如你所说,使用幂等消息解决了这个问题。

  1. 问题等待很长时间没有提交偏移量,例如延迟时间是24小时,将从迭代器获得下一个,睡眠24小时,进程和提交(ZK会话超时?)

除非消息本身的处理花费超过会话超时,否则这对于上述解决方案不会有问题。

  1. ZK会话如何保持活跃而不提交新的偏移量? (设置一个配置单元zookeeper.session.timeout.ms可以解决死亡的消费者而不认识它)

再次使用上述内容,您不需要设置长会话超时。

  1. 我遗失的任何其他问题?

总有;)

在你的案例中,我会建议另一条路线。

解决消费者主线程中的等待时间是没有意义的。 这将是队列使用方式的反模式。 从概念上讲,您需要尽可能快地处理消息,并使队列保持低负载因子。

相反,我会使用一个调度程序来为每个需要延迟的消息安排作业。 这样,您可以处理队列并创建将在预定义时间点触发的异步作业。

使用这种技术的缺点是,对于将预定作业保存在内存中的JVM的状态是明智的。 如果该JVM失败,则会丢失计划的作业,并且您不知道该任务是否已执行。

有调度程序实现,但可以配置为在集群环境中运行,从而使您免受JVM崩溃的影响。

看看这个java调度框架: http : //www.quartz-scheduler.org/

使用Tibco EMS或其他JMS队列。 他们内置了重试延迟。 卡夫卡可能不是您正在做的事情的正确设计选择

按计划列出的关键列表或其redis替代方案可能是最佳方法。