Tag: kafka consumer api

为什么Kafka消费者表现缓慢?

我有一个简单的主题,一个简单的Kafka使用者和生产者,使用默认配置。 程序很简单,我有两个线程。 在生产者中,它不断发送16个字节的数据。 在消费者方面,它不断接收。 我发现生产者的吞吐量大约是10MB / s,这很好。 但消费者的吞吐量仅为0.2MB / s。 我已经禁用了所有的调试日志,但这并没有让它变得更好。 测试在本地计算机上运行。 什么机构都知道出了什么问题? 谢谢! 我使用的代码如下:制片人: KafkaProducer producer = new KafkaProducer(props); int size = 16; byte[] payload = new byte[size]; String key = “key”; Arrays.fill(payload, (byte) 1); ProducerRecord record = new ProducerRecord(“test”,0,key.getBytes(),payload); while(true){ producer.send(record); } 消费者: Properties consumerProps = new Properties(); consumerProps.put(“zookeeper.connect”, “localhost:2181”); consumerProps.put(“group.id”, “test”); ConsumerConnector […]

Kafka使用者 – 消费者进程和线程与主题分区的关系是什么

我最近一直在与卡夫卡合作,对消费者群体下的消费者有点困惑。 混淆的中心是将消费者实现为流程还是线程。 对于这个问题,假设我正在使用高级消费者。 让我们考虑一下我尝试过的场景。 在我的主题中有2个分区(为简单起见,我们假设复制因子只有1)。 我创建了一个使用group1组的消费者( ConsumerConnector )进程consumer1 ,然后创建了一个大小为2的主题计数映射,然后在该进程group1成了2个消费者线程consumer1_thread1和consumer1_thread2 。 看起来consumer1_thread1正在消耗分区0而consumer1_thread2正在消耗分区1 。 这种行为总是确定的吗? 以下是代码段。 TestConsumer类是我的消费者线程类。 … Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(2)); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(2); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new TestConsumer(stream, threadNumber)); threadNumber++; } … 现在,让我们考虑另一个场景(我没有尝试但很好奇),我开始2个消费者进程consumer1和consumer2都具有相同的组group1 ,每个都是单线程进程。 […]

Kafka如何为每个主题存储偏移量?

在轮询Kafka时,我使用subscribe()函数订阅了多个主题。 现在,我想设置我想从每个主题中读取的偏移量,而不是在主题的每个seek()和poll()之后重新订阅。 在轮询数据到达结果之前 ,是否会在每个主题名称上迭代地调用seek() ? 如何在Kafka中准确存储偏移量? 我每个主题都有一个分区,只有一个消费者可以阅读所有主题。

Kafka 0.9.0.1 Java消费者陷入awaitMetadataUpdate()

我正在尝试使用Java API v0.9.0.1让一个简单的Kafka Consumer工作。 我正在使用的kafka服务器是一个docker容器,也运行版本0.9.0.1。 以下是消费者代码: public class Consumer { public static void main(String[] args) throws IOException { KafkaConsumer consumer; try (InputStream props = Resources.getResource(“consumer.props”).openStream()) { Properties properties = new Properties(); properties.load(props); consumer = new KafkaConsumer(properties); } consumer.subscribe(Arrays.asList(“messages”)); try { while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) System.out.println(“Message received: ” […]

kafka.consumer.SimpleConsumer:由于套接字错误而重新连接:java.nio.channels.ClosedChannelException

我正在为kafka运行一个简单的消费者,例如: int timeout = 80000; int bufferSize = 64*1024; consumer = new SimpleConsumer(host, port,timeout, bufferSize, clientName); 这运行好几个小时,但我稍后在kafka.consumer.SimpleConsumer上得到一个例外:由于套接字错误重新连接: java.nio.channels.ClosedChannelException 和消费者停止……以前有人遇到过这个问题吗?

KafkaConsumer 0.10 Java API错误消息:没有分区的当前分配

我正在使用KafkaConsumer 0.10 Java api。 我想从特定的分区和特定的偏移消耗。 我抬起头,发现有一个搜索方法,但它抛出exception。 任何人都有类似的用例或解决方案? 码: KafkaConsumer consumer = new KafkaConsumer(consumerProps); consumer.seek(new TopicPartition(“mytopic”, 1), 4); 例外 java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) at xx.xxx.xxx.Test.main(Test.java:182)

Kafka – 使用高级消费者实现延迟队列

想要使用高级消费者api实现延迟消费者 大意: 按密钥生成消息(每个消息包含创建时间戳)这可确保每个分区按生产时间排序消息。 auto.commit.enable = false(将在每个消息进程后显式提交) 消费一条消息 检查消息时间戳并检查是否已经过了足够的时间 进程消息(此操作永远不会失败) 提交1个偏移量 while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something…. } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail […]

如何在kafka中创建自定义序列化程序?

只有很少的序列化器可用,如, org.apache.kafka.common.serialization.StringSerializer org.apache.kafka.common.serialization.StringSerializer 我们如何创建自己的自定义序列化程序?

带解码器问题的Kafka Avro Consumer

当我尝试使用我的相应模式使用Avro运行Kafka Consumer时 ,它返回错误“AvroRuntimeException:格式错误的数据。长度为负:-40”。 我看到其他人有类似的问题,将字节数组转换为json , Avro写入和读取 ,以及Kafka Avro Binary *编码器 。 我也引用了这个消费者组示例 ,它们都很有帮助,但到目前为止这个错误没有任何帮助..它可以工作到这部分代码(第73行) 解码器解码器= DecoderFactory.get()。binaryDecoder(byteArrayInputStream,null); 我已经尝试了其他解码器并打印出byteArrayInputStream变量的内容,看起来我相信你会期望序列化的avro数据看起来(在消息中我可以看到模式和一些数据以及一些格式错误的数据)我打印出来了使用.available()方法可用的字节,返回594.我无法理解为什么会发生此错误。 Apache Nifi用于生成具有来自hdfs的相同模式的Kafka流。 我将不胜感激任何帮助。