Kafka如何为每个主题存储偏移量?
在轮询Kafka时,我使用subscribe()
函数订阅了多个主题。 现在,我想设置我想从每个主题中读取的偏移量,而不是在主题的每个seek()
和poll()
之后重新订阅。 在轮询数据到达结果之前 ,是否会在每个主题名称上迭代地调用seek()
? 如何在Kafka中准确存储偏移量?
我每个主题都有一个分区,只有一个消费者可以阅读所有主题。
每个主题的KAFKA商店如何抵消?
卡夫卡已将胶印仓库从动物园管理员转移到卡夫卡经纪人。 原因如下:
Zookeeper不是服务高写入负载(例如偏移更新)的好方法,因为zookeeper路由每个节点通过每个节点写入,因此无法分区或以其他方式扩展写入。 我们一直都知道这一点,但选择这种实现作为一种“方便的结合”,因为我们已经依赖于zk。
Kafka将偏移提交存储在主题中,当使用者提交偏移量时,kafka将提交偏移消息发布到“commit-log”主题并保留内容结构,将组/主题/分区映射到最新偏移量以便快速检索。 可以在此找到更多的设计信息。
现在,我想设置我想从每个主题中读取的偏移量,而不是在主题的每个seek()和poll()之后重新订阅。
kafka管理工具有一个新function可以重置偏移量。
kafka-consumer-group.sh –bootstrap-server 127.0.0.1:9092 –group your-consumer-group –reset-offsets –to-offset 1 –all-topics –execute
您可以参考的更多选项https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
实际上是Zookeeper可以保存偏移量
https://www.quora.com/What-is-the-actual-role-of-Zookeeper-in-Kafka-What-benefits-will-I-miss-out-on-if-I-don%E2% 80%99吨使用-动物园管理员和-卡夫卡在一起
然后,在使用者和特定的group_id中 ,您可以选择从特定主题中读取,如下所示:
- 主题中的所有消息
- 所有新消息
为此您可以使用该属性:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
其他选项不是“最早的”,而是:
- 最早 :自动将偏移重置为最早的偏移量
- 最新 :自动将偏移重置为最新的偏移量
- none:如果未找到先前的偏移量或消费者的组,则向使用者抛出exception
- 其他:向消费者抛出exception。
这是Kafka消费者的样本:
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; public class ConsumerGroup { public static void main(String[] args) throws Exception { if(args.length < 2){ System.out.println("Usage: consumer "); return; } String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords records = con-sumer.poll(100); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
Kafka的抵消存储在消费者一方。 每个消费者都会存储每个主题的偏移量,通常在zookeeper中。