如何在java程序中获取kafka消耗滞后

我写了一个java程序来消耗来自kafka的消息。 我想监视消耗滞后,如何通过java获取它?

顺便说一句,我使用:

org.apache.kafka kafka_2.11 0.10.1.1 

提前致谢。

我个人直接查询来自我的消费者的jmx信息。 我只在java中使用JMX bean: kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*/records-lag-max可用。

如果jolokia在你的类路径中,你可以在/jolokia/read/kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*/records-lag-max上使用GET检索值并收集所有结果在一个地方。

还有Burrow ,它很容易配置,但它有点过时(如果我记得很清楚,不适用于0.10)。

如果您不想在项目中包含kafka(和scala)依赖项,可以使用下面的类。 它仅使用kafka-clients依赖项。

 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BinaryOperator; import java.util.stream.Collectors; public class KafkaConsumerMonitor { public static class PartionOffsets { private long endOffset; private long currentOffset; private int partion; private String topic; public PartionOffsets(long endOffset, long currentOffset, int partion, String topic) { this.endOffset = endOffset; this.currentOffset = currentOffset; this.partion = partion; this.topic = topic; } public long getEndOffset() { return endOffset; } public long getCurrentOffset() { return currentOffset; } public int getPartion() { return partion; } public String getTopic() { return topic; } } private final String monitoringConsumerGroupID = "monitoring_consumer_" + UUID.randomUUID().toString(); public Map getConsumerGroupOffsets(String host, String topic, String groupId) { Map logEndOffset = getLogEndOffset(topic, host); KafkaConsumer consumer = createNewConsumer(groupId, host); BinaryOperator mergeFunction = (a, b) -> { throw new IllegalStateException(); }; Map result = logEndOffset.entrySet() .stream() .collect(Collectors.toMap( entry -> (entry.getKey()), entry -> { OffsetAndMetadata committed = consumer.committed(entry.getKey()); return new PartionOffsets(entry.getValue(), committed.offset(), entry.getKey().partition(), topic); }, mergeFunction)); return result; } public Map getLogEndOffset(String topic, String host) { Map endOffsets = new ConcurrentHashMap<>(); KafkaConsumer consumer = createNewConsumer(monitoringConsumerGroupID, host); List partitionInfoList = consumer.partitionsFor(topic); List topicPartitions = partitionInfoList.stream().map(pi -> new TopicPartition(topic, pi.partition())).collect(Collectors.toList()); consumer.assign(topicPartitions); consumer.seekToEnd(topicPartitions); topicPartitions.forEach(topicPartition -> endOffsets.put(topicPartition, consumer.position(topicPartition))); consumer.close(); return endOffsets; } private static KafkaConsumer createNewConsumer(String groupId, String host) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new KafkaConsumer<>(properties); } } 

我正在使用Spring作为我的api。 使用下面的代码,您可以通过java获取指标。代码有效。

 @Component public class Receiver { private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; public void testlag() { for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry .getListenerContainers()) { Map> metrics = messageListenerContainer.metrics(); metrics.forEach( (clientid, metricMap) ->{ System.out.println("------------------------For client id : "+clientid); metricMap.forEach((metricName,metricValue)->{ //if(metricName.name().contains("lag")) System.out.println("------------Metric name: "+metricName.name()+"-----------Metric value: "+metricValue.metricValue()); }); }); } } 

尝试使用AdminClient#listGroupOffsets(groupID)来检索与使用者组相关联的所有主题分区的偏移量。 例如:

 AdminClient client = AdminClient.createSimplePlaintext("localhost:9092"); Map offsets = JavaConversions.asJavaMap( client.listGroupOffsets("groupID")); Long offset = (Long) offsets.get(new TopicPartition("topic", 0)); ... 

编辑
上面的代码段显示了如何获取给定分区的已提交偏移量。 下面的代码显示了如何检索分区的LEO。

 public long getLogEndOffset(TopicPartition tp) { KafkaConsumer consumer = createNewConsumer(); Collections.singletonList(tp); consumer.assign(Collections.singletonList(tp)); consumer.seekToEnd(Collections.singletonList(tp)); return consumer.position(tp); } private KafkaConsumer createNewConsumer() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "g1"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return new KafkaConsumer(properties); } 

调用getLogEndOffset返回给定分区的LEO,然后从中减去提交的偏移量,结果就是滞后。