Tag: apache kafka

为什么Kafka消费者表现缓慢?

我有一个简单的主题,一个简单的Kafka使用者和生产者,使用默认配置。 程序很简单,我有两个线程。 在生产者中,它不断发送16个字节的数据。 在消费者方面,它不断接收。 我发现生产者的吞吐量大约是10MB / s,这很好。 但消费者的吞吐量仅为0.2MB / s。 我已经禁用了所有的调试日志,但这并没有让它变得更好。 测试在本地计算机上运行。 什么机构都知道出了什么问题? 谢谢! 我使用的代码如下:制片人: KafkaProducer producer = new KafkaProducer(props); int size = 16; byte[] payload = new byte[size]; String key = “key”; Arrays.fill(payload, (byte) 1); ProducerRecord record = new ProducerRecord(“test”,0,key.getBytes(),payload); while(true){ producer.send(record); } 消费者: Properties consumerProps = new Properties(); consumerProps.put(“zookeeper.connect”, “localhost:2181”); consumerProps.put(“group.id”, “test”); ConsumerConnector […]

Kafka使用者 – 消费者进程和线程与主题分区的关系是什么

我最近一直在与卡夫卡合作,对消费者群体下的消费者有点困惑。 混淆的中心是将消费者实现为流程还是线程。 对于这个问题,假设我正在使用高级消费者。 让我们考虑一下我尝试过的场景。 在我的主题中有2个分区(为简单起见,我们假设复制因子只有1)。 我创建了一个使用group1组的消费者( ConsumerConnector )进程consumer1 ,然后创建了一个大小为2的主题计数映射,然后在该进程group1成了2个消费者线程consumer1_thread1和consumer1_thread2 。 看起来consumer1_thread1正在消耗分区0而consumer1_thread2正在消耗分区1 。 这种行为总是确定的吗? 以下是代码段。 TestConsumer类是我的消费者线程类。 … Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(2)); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(2); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new TestConsumer(stream, threadNumber)); threadNumber++; } … 现在,让我们考虑另一个场景(我没有尝试但很好奇),我开始2个消费者进程consumer1和consumer2都具有相同的组group1 ,每个都是单线程进程。 […]

Kafka如何为每个主题存储偏移量?

在轮询Kafka时,我使用subscribe()函数订阅了多个主题。 现在,我想设置我想从每个主题中读取的偏移量,而不是在主题的每个seek()和poll()之后重新订阅。 在轮询数据到达结果之前 ,是否会在每个主题名称上迭代地调用seek() ? 如何在Kafka中准确存储偏移量? 我每个主题都有一个分区,只有一个消费者可以阅读所有主题。

Kafurn in Kubernetes – 将协调员标记为团体死亡

我对Kubernetes很新,并想用它来设置Kafka和zookeeper。 我能够使用StatefulSets在Kubernetes中设置Apache Kafka和Zookeeper。 我按照这个来构建我的清单文件。 我制作了1张kafka和zookeeper的复制品,并且还使用了持久卷。 所有pod都在运行并准备就绪。 我尝试通过指定nodePort(30010)来公开kafka并使用Service 。 看起来这会将kafka暴露给外界,在那里他们可以向kafka经纪人发送消息并从中消费。 但是在我的Java应用程序中,我创建了一个使用者并将bootstrapServer添加为:30010 ,显示了以下日志: INFO oakcciAbstractCoordinator – Discovered coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) for group workerListener. INFO oakcciAbstractCoordinator – Marking the coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) dead for group workerListener 有趣的是,当我使用kubectl命令测试集群时,我能够生成和使用消息: kubectl run -ti –image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 produce –restart=Never –rm \ — kafka-console-producer.sh –topic test –broker-list kafka-0.kafka-hs.default.svc.cluster.local:9093 done; […]

Apache Kafka – 关于主题/分区的KafkaStream

我正在为高容量高速分布式应用编写Kafka Consumer。 我只有一个主题,但传入消息的速率非常高。 拥有多个服务于更多消费者的分区将适用于此用例。 最好的消费方式是拥有多个流阅读器。 根据文档或可用示例,ConsumerConnector提供的KafkaStream数量基于主题数量。 想知道如何获得多个KafkaStream读取器[基于分区],以便我可以跨每个流跨越一个线程或从多个线程中的相同KafkaStream读取将从多个分区进行并发读取? 任何见解都非常感谢。

Storm-Kafka多个鲸鱼喷水,如何分担负荷?

我试图在多个喷口之间分享任务。 我有一种情况,我从外部源一次得到一个元组/消息,我想要有多个spout实例,主要目的是分担负载并提高性能效率。 我可以用一个Spout本身做同样的事情,但我想分担多个喷口的负载。 我无法获得分散负载的逻辑。 由于消息的偏移在特定喷口完成消耗部件之前将不会被知道(即,基于设置的缓冲器大小)。 任何人都可以对如何计算逻辑/算法有所启发吗? 提前谢谢你的时间。 更新以回答答案: 现在在Kafka上使用多分区(即5 ) 以下是使用的代码: builder.setSpout(“spout”, new KafkaSpout(cfg), 5); 通过在每个分区上使用800 MB数据进行泛洪测试,完成读取需要~22 sec 。 再次,使用parallelism_hint = 1的代码 即builder.setSpout(“spout”, new KafkaSpout(cfg), 1); 现在花了更多~23 sec ! 为什么? 根据Storm Docs的 setSpout()声明如下: public SpoutDeclarer setSpout(java.lang.String id, IRichSpout spout, java.lang.Number parallelism_hint) 哪里, parallelism_hint – 是执行此spout应分配的任务数。 每个任务都将在群集周围某个进程中的某个线程上运行。

kafka 8和内存 – Java Runtime Environment没有足够的内存来继续

我正在使用DigiOcean实例和512兆内存,我用kafka得到了以下错误。 我不是一个java熟练的开发者。 如何调整kafka以利用少量的ram。 这是一个开发者。 我不想每小时为更大的机器支付更多费用。 # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory. # An error report file with more information is saved as: # //hs_err_pid6500.log OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000bad30000, 986513408, 0) failed; […]

如何在Kafka中使用多个消费者?

我是一名学习卡夫卡的新学生,我遇到了一些基本问题,理解多个消费者,文章,文档等对目前来说都没有太大的帮助。 我试图做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,向主题发布100条简单消息并让我的消费者检索它们。 我已经成功地做到了这一点,但是当我尝试引入第二个消费者来消费刚刚发布消息的同一主题时,它不会收到任何消息。 我的理解是,对于每个主题,您可以拥有来自不同消费者群体的消费者,并且每个消费者群体都可以获得针对某个主题生成的消息的完整副本。 它是否正确? 如果没有,那么建立多个消费者的正确方法是什么? 这是我到目前为止写的消费者类: public class AlternateConsumer extends Thread { private final KafkaConsumer consumer; private final String topic; private final Boolean isAsync = false; public AlternateConsumer(String topic, String consumerGroup) { Properties properties = new Properties(); properties.put(“bootstrap.servers”, “localhost:9092”); properties.put(“group.id”, consumerGroup); properties.put(“partition.assignment.strategy”, “roundrobin”); properties.put(“enable.auto.commit”, “true”); properties.put(“auto.commit.interval.ms”, “1000”); properties.put(“session.timeout.ms”, “30000”); properties.put(“key.deserializer”, “org.apache.kafka.common.serialization.IntegerDeserializer”); properties.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); consumer […]

避免apache kafka使用者中重复消息的有效策略

我一直在学习apache kafka一个月了。 然而,我现在陷入了困境。 我的用例是,我有两个或更多的消费者进程在不同的机器上运行。 我运行了一些测试,其中我在kafka服务器上发布了10,000条消息。 然后在处理这些消息时,我杀死了一个消费者进程并重新启动它。 消费者在文件中编写已处理的消息。 消费完成后,文件显示超过10k条消息。 所以有些消息是重复的。 在消费者流程中,我已禁用自动提交。 消费者手动批量提交偏移。 因此,例如,如果将100条消息写入文件,则消费者提交偏移量。 当单个消费者进程正在运行并且以这种方式避免崩溃并恢复重复时。 但是当多个消费者正在运行并且其中一个消失并且恢复时,它会将重复的消息写入文件。 是否有任何有效的策略来避免这些重复的消息?

无法在jConsole中看到kafka.consumer和kafka.producer mBean

编辑: 我知道我应该在我的Consumer和Producer流程中启用JMX并从各个流程获取mBean信息。 我将如何为Kafka和消耗消息的其他Java进程发送消息的Java进程发布消息? 我为Kafka启用了JMX。 但我仍然无法在jConsole中看到这两个mBeans。 我尝试过pub / sub onn Kafka经纪人,但仍然没有结果。 我的步骤如下:在kafka-run-class.sh中添加以下内容: KAFKA_JMX_OPTS=”-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.1.1 -Djava.net.preferIPv4Stack=true “ 在kafka-server-start.sh中的导出JMX_PORT = $ {JMX_PORT:-9999}中添加了端口