如何获得kafka主题的最新偏移量?

我正在使用Java编写一个kafka消费者。 我想保留消息的实时,所以如果等待消费的消息太多,例如1000或更多,我应该放弃未消耗的消息并开始使用最新的消息。

对于这个问题,我尝试比较最后一个提交的偏移量和一个主题的最新偏移量(只有一个分区),如果这两个偏移量之间的差异大于一定量,我将把主题的最新偏移量设置为下一个偏移,以便我可以放弃那些冗余的消息。

现在我的问题是如何获得一个主题的最新偏移,有人说我可以使用旧的消费者,但它太复杂,新的消费者有这个function吗?

新消费者也很复杂。

//assign the topic consumer.assign();

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();

对于Kafka版本:0.10.1.1

 // Get the diff of current position and latest offset Set partitions = new HashSet(); TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition()); partitions.add(actualTopicPartition); Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition); long actualPosition = consumer.position(actualTopicPartition); System.out.println(String.format("diff: %s (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition)); 

您还可以使用kafka服务器命令行工具:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic-name

 KafkaConsumer consumer = ... consumer.subscribe(Collections.singletonList(topic)); TopicPartition topicPartition = new TopicPartition(topic, partition); consumer.poll(0); consumer.seekToEnd(Collections.singletonList(topicPartition)); long currentOffset = consumer.position(topicPartition) -1; 

上面的代码段返回给定主题和分区号的当前已提交消息偏移量。