Kafka使用者 – 消费者进程和线程与主题分区的关系是什么

我最近一直在与卡夫卡合作,对消费者群体下的消费者有点困惑。 混淆的中心是将消费者实现为流程还是线程。 对于这个问题,假设我正在使用高级消费者。

让我们考虑一下我尝试过的场景。 在我的主题中有2个分区(为简单起见,我们假设复制因子只有1)。 我创建了一个使用group1组的消费者( ConsumerConnector )进程consumer1 ,然后创建了一个大小为2的主题计数映射,然后在该进程group1成了2个消费者线程consumer1_thread1consumer1_thread2 。 看起来consumer1_thread1正在消耗分区0consumer1_thread2正在消耗分区1 。 这种行为总是确定的吗? 以下是代码段。 TestConsumer类是我的消费者线程类。

  ... Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(2)); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(2); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new TestConsumer(stream, threadNumber)); threadNumber++; } ... 

现在,让我们考虑另一个场景(我没有尝试但很好奇),我开始2个消费者进程consumer1consumer2都具有相同的组group1 ,每个都是单线程进程。 现在我的问题是:

  1. 在这种情况下,两个独立的消费者流程(在同一组下)是如何与分区相关的? 它与上述单进程multithreading场景有何不同?

  2. 通常,消费者线程或进程如何映射/与主题中的分区相关?

  3. Kafka文档确实说消费者组下的每个消费者将使用一个分区。 但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者流程?

  4. 关于将消费者作为进程与线程实现,我在这里缺少任何微妙的东西吗? 提前致谢。

消费者组可以运行多个消费者实例(具有相同group-id多个进程)。 虽然消耗每个分区,但该组中只有一个消费者实例正在使用它

例如,如果您的主题包含2个分区,并且您启动了具有2个使用者实例的使用者组group-A ,那么每个分区将使用来自该主题的特定分区的消息。

如果您启动具有不同组ID group-Agroup-B的相同2个使用者,则来自该主题的两个分区的消息将被广播到它们中的每一个。 因此,在这种情况下,在group-A下运行的消费者实例将具有来自该主题的两个分区的消息,对于group-B也是如此。

在他们的文档上阅读更多相关内容

编辑 :根据您的评论说,

我想知道在同一个流程下拥有2个消费者线程与2个消费者流程之间的有效区别是什么(在两种情况下组都相同)

消费者group-id在整个群集中是相同/全局的。 假设你已经启动了一个有2个线程的进程 – 然后生成另一个进程(可能在不同的机器中),同一个groupId有2个以上的线程,那么kafka将添加这2个新线程来使用来自该主题的消息。 因此最终将有4个线程负责从同一主题消费。 然后,Kafka将触发重新平衡以将分区重新分配给线程,因此可能发生对于由T1 of process P1的线程T1 of process P1消耗的特定分区可以被分配以由T2 of process P2线程T2 of process P2消耗。 以下几行来自维基页面

当使用相同的使用者组名称启动新进程时,Kafka会将该进程的线程添加到可用于使用Topic并触发“重新平衡”的线程集。 在此重新平衡期间,Kafka会将可用分区分配给可用线程,可能会将分区移动到另一个进程。 如果您混合使用新旧业务逻辑,则某些消息可能会转到旧逻辑。

选择具有相同id而不是单个使用者组实例的多个使用者组实例的主要设计决策是弹性。 例如,如果您有一个具有两个线程的消费者,那么如果该机器出现故障,您将失去所有消费者。 如果您有两个具有相同ID的单独的使用者组,每个用户组在不同的主机上,则它们可以在失败时存活 理想情况下,每个消费者组在上面应该有两个线程,因此如果一个主机发生故障,另一个消费者组使用其hibernate线程来占用另一个分区。 实际上,总是希望拥有比分区更多的线程来覆盖这个因素。

  1. 您可以在不同的主机上运行每个使用者组。 对于给定名称/ id的单个使用者组,它将仅在单个主机上运行,​​因为它在单个运行时环境中管理其所有线程。
  2. Kafka有一个算法来确定哪些线程/消费者组读取各种主题分区。 卡夫卡试图以有弹性的方式均匀地分配这些。 当使用者组失败时,它使其他组中的其他线程能够读取给定的分区。
  3. 指消费者组中的单个线程。 如果线程多于分区,那么其中一些线程将保持hibernate状态,直到其他线程无法提供弹性。
  4. 偏好与弹性有关。 因此,对于具有相同ID的多个使用者组,我可以在多个主机上运行,​​使我的应用程序能够容忍失败。