Tag: kafka consumer api

如何暂停卡夫卡消费者?

我在我的框架中使用Kafka生产者 – 消费者模型。 消费者端消耗的记录随后被索引到elasticsearch上。 在这里我有一个用例,如果ES关闭,我将不得不暂停kafka消费者,直到ES启动,一旦启动,我需要恢复消费者并消耗我上次离开的记录。 我不认为这可以用@KafkaListener实现。 有谁能请给我一个解决方案吗? 我想我需要为此编写自己的KafkaListenerContainer,但我无法正确实现它。 任何帮助将非常感激。

如何动态地将主题传递给kafka监听器?

几天后,我正在尝试将主题动态传递给Kafka监听器,而不是通过Java DSL中的密钥使用它们。 周围的人之前做过这个或者可以说明实现这个目标的最佳方法是什么?

kafka-connect错误:无法找到或加载主类

我正在遵循官方文档来实现 kakf-connect以从文件中读取数据。 我有完美的kafka跑步。 生产者和消费者发送和接收消息。 但是,当我运行以下命令时: sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties 我收到以下错误: 错误:无法找到或加载主类org.apache.kafka.connect.cli.ConnectStandalone 我交叉检查,我有ConnectStandalone文件connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone 。 我的connect-file-source.properties如下: name=local-file-source connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector tasks.max=1 file=test.txt topic=spark-kafka 有什么遗失的吗? 我该怎么做才能摆脱这个错误?

spring boot kafka consumer – 如何从spring boot中正确使用kafka消息

我正在开发一个弹簧启动应用程序,它假设使用kafka消息。 我有一个奇怪的结果:当我使用kafka-console-producer.sh发送消息时,我的消费者只检测并打印另一条消息。 例如 – 在kafka控制台生产者中,我输入“one” – > Enter – >“two” – > Enter – >“three” – > Enter。 在我的春季靴子消费者中,我只会看到“两个”,“四个”等… 我的ConsumeConfigFactory.java: import java.util.Properties; import javax.annotation.PostConstruct; import kafka.consumer.ConsumerConfig; import org.springframework.stereotype.Component; @Component public class ConsumerConfigFactory { private static final String ZK_CONNECT = “10.211.55.2:2181”; private ConsumerConfig consumerConfig; @PostConstruct private void createConsumerConfig() { Properties props = new Properties(); props.put(“zookeeper.connect”, ZK_CONNECT); […]

kafka使用者动态检测添加的主题

我正在使用KafkaConsumer来消费来自Kafka服务器(主题)的消息。 它适用于在启动消费者代码之前创建的主题… 但问题是,如果动态创建的主题(我的意思是说消费者代码开始之后),它将无法工作,但API表示它将支持动态主题创建..这是您的参考链接.. 使用的Kafka版本:0.9.0.1 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 这是JAVA代码…… Properties props = new Properties(); props.put(“bootstrap.servers”, “localhost:9092”); props.put(“group.id”, “test”); props.put(“enable.auto.commit”, “false”); props.put(“auto.commit.interval.ms”, “1000”); props.put(“session.timeout.ms”, “30000”); props.put(“key.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”); props.put(“value.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”); KafkaConsumer consumer = new KafkaConsumer(props); Pattern r = Pattern.compile(“siddu(\\d)*”); consumer.subscribe(r, new HandleRebalance()); try { while(true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord> partitionRecords = records.records(partition); for (ConsumerRecord […]

卡夫卡模式订阅。 新主题没有触发重新平衡

根据关于kafka javadocs的文档,如果我: 订阅模式 创建与模式匹配的主题 应该发生重新平衡,这使得消费者从该新主题中读取。 但那并没有发生。 如果我停止并启动消费者,它确实会选择新主题。 所以我知道新主题与模式匹配。 在https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics中可能存在此问题的重复,但这个问题无处可去。 我看到kafka日志并没有错误,它只是不会触发重新平衡。 当消费者加入或死亡时触发重新平衡,但是在创建新主题时不会触发(即使将分区添加到现有主题,但这是另一个主题)。 我正在使用kafka 0.10.0.0和“新消费者API”的官方Java客户端,意思是代理GroupCoordinator而不是胖客户端+ zookeeper。 这是示例消费者的代码: public class SampleConsumer { public static void main(String[] args) throws IOException { KafkaConsumer consumer; try (InputStream props = Resources.getResource(“consumer.props”).openStream()) { Properties properties = new Properties(); properties.load(props); properties.setProperty(“group.id”, “my-group”); System.out.println(properties.get(“group.id”)); consumer = new KafkaConsumer(properties); } Pattern pattern = Pattern.compile(“mytopic.+”); consumer.subscribe(pattern, new […]

Kafka消费者(0.8.2.2)可以批量阅读消息吗?

根据我的理解,Kafka消费者按顺序从指定的分区读取消息… 我们计划拥有多个Kafka使用者(Java),它们具有相同的组I ..如果它从指定的分区顺序读取,那么我们如何实现高吞吐量..例如,生产者发布消息,如每秒40个。消费者流程每秒消息1 ..虽然我们可以有多个消费者,但不能有40 rt ??? 如我错了请纠正我… 在我们的情况下,消费者只有在消息成功处理后才能提交偏移..这些消息将被重新处理…有没有更好的解决方案???

Kafka消费者配置/性能问题

我正在尝试将kafka作为AWS SQS的替代品。 动机主要是提高性能,其中kafka将消除限制,一次性提取10条消息,上限为256kb。 这是我的用例的高级场景。 我有一堆爬虫正在发送索引文件。 有效载荷的大小平均约为1mb。 爬虫调用SOAP端点,后者又运行生产者代码以将消息提交给kafka队列。 消费者应用程序获取消息并处理它们。 对于我的测试框,我已经为主题配置了30个分区和2个复制。 两个kafka实例正在运行1个zookeeper实例。 卡夫卡版本是0.10.0。 对于我的测试,我在队列中发布了700万条消息。 我创建了一个包含30个消费者线程的消费者组,每个分区一个。 我最初的印象是,与通过SQS获得的相比,这将大大加快处理能力。 不幸的是,事实并非如此。 就我而言,数据处理很复杂,平均需要1-2分钟才能完成。这导致了一系列的分区重新平衡,因为线程无法按时心跳。 我可以在日志引用中看到一堆消息 组full_group的自动偏移提交失败:由于组已经重新平衡并将分区分配给另一个成员,因此无法完成提交。 这意味着后续调用poll()之间的时间比配置的session.timeout.ms长,这通常意味着轮询循环花费了太多时间进行消息处理。 您可以通过增加会话超时或通过max.poll.records减少poll()中返回的批量的最大大小来解决此问题。 这导致多次处理相同的消息。 我尝试使用会话超时,max.poll.records和轮询时间来避免这种情况,但这会减慢整个处理时间。 这是一些配置参数。 metadata.max.age.ms = 300000 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] enable.auto.commit = true max.poll.records = 10000 request.timeout.ms = 310000 heartbeat.interval.ms = 100000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 fetch.min.bytes = 1 send.buffer.bytes […]

如何获得kafka主题的最新偏移量?

我正在使用Java编写一个kafka消费者。 我想保留消息的实时,所以如果等待消费的消息太多,例如1000或更多,我应该放弃未消耗的消息并开始使用最新的消息。 对于这个问题,我尝试比较最后一个提交的偏移量和一个主题的最新偏移量(只有一个分区),如果这两个偏移量之间的差异大于一定量,我将把主题的最新偏移量设置为下一个偏移,以便我可以放弃那些冗余的消息。 现在我的问题是如何获得一个主题的最新偏移,有人说我可以使用旧的消费者,但它太复杂,新的消费者有这个function吗?

Apache Kafka和Avro:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer

每当我尝试从kafka队列中读取消息时,我都会遇到以下exception: [error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79) at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) 卡夫卡制片人代码: public class AvroSpecificProducer { private static Properties kafkaProps = new Properties(); private static KafkaProducer kafkaProducer; static { kafkaProps.put(“bootstrap.servers”, “localhost:9092”); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); kafkaProps.put(“schema.registry.url”, […]