Kafka如何为每个主题存储偏移量?

在轮询Kafka时,我使用subscribe()函数订阅了多个主题。 现在,我想设置我想从每个主题中读取的偏移量,而不是在主题的每个seek()poll()之后重新订阅。 在轮询数据到达结果之前 ,是否会每个主题名称上迭代地调用seek() ? 如何在Kafka中准确存储偏移量?

我每个主题都有一个分区,只有一个消费者可以阅读所有主题。

每个主题的KAFKA商店如何抵消?

卡夫卡已将胶印仓库从动物园管理员转移到卡夫卡经纪人。 原因如下:

Zookeeper不是服务高写入负载(例如偏移更新)的好方法,因为zookeeper路由每个节点通过每个节点写入,因此无法分区或以其他方式扩展写入。 我们一直都知道这一点,但选择这种实现作为一种“方便的结合”,因为我们已经依赖于zk。

Kafka将偏移提交存储在主题中,当使用者提交偏移量时,kafka将提交偏移消息发布到“commit-log”主题并保留内容结构,将组/主题/分区映射到最新偏移量以便快速检索。 可以在此找到更多的设计信息。

现在,我想设置我想从每个主题中读取的偏移量,而不是在主题的每个seek()和poll()之后重新订阅。

kafka管理工具有一个新function可以重置偏移量。

kafka-consumer-group.sh –bootstrap-server 127.0.0.1:9092 –group your-consumer-group –reset-offsets –to-offset 1 –all-topics –execute

您可以参考的更多选项https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

实际上是Zookeeper可以保存偏移量

https://www.quora.com/What-is-the-actual-role-of-Zookeeper-in-Kafka-What-benefits-will-I-miss-out-on-if-I-don%E2% 80%99吨使用-动物园管理员和-卡夫卡在一起

然后,在使用者和特定的group_id中 ,您可以选择从特定主题中读取,如下所示:

  • 主题中的所有消息
  • 所有新消息

为此您可以使用该属性:

 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

其他选项不是“最早的”,而是:

  • 最早 :自动将偏移重置为最早的偏移量
  • 最新 :自动将偏移重置为最新的偏移量
  • none:如果未找到先前的偏移量或消费者的组,则向使用者抛出exception
  • 其他:向消费者抛出exception。

这是Kafka消费者的样本:

 import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; public class ConsumerGroup { public static void main(String[] args) throws Exception { if(args.length < 2){ System.out.println("Usage: consumer  "); return; } String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords records = con-sumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } 

Kafka的抵消存储在消费者一方。 每个消费者都会存储每个主题的偏移量,通常在zookeeper中。