Tag: apache kafka

在卡夫卡创造了多少生产商?

在大量实时Java Web应用程序中,我正在向apache kafka发送消息。 目前我正在发送一个主题,但将来我可能需要向多个主题发送消息。 在这种情况下,我不确定每个主题创建一个制作人的天气,还是我应该使用单个制作人来处理我的所有主题? 这是我的代码: props = new Properties(); props.put(“zk.connect”, :,:,:); props.put(“zk.connectiontimeout.ms”, “1000000”); props.put(“producer.type”, “async”); Producer producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props)); ProducerData producerData1 = new ProducerData(“someTopic1”, messageTosend); ProducerData producerData2 = new ProducerData(“someTopic2”, messageTosend); producer.send(producerData1); producer.send(producerData2); 如您所见,一旦创建了生产者,我就可以使用它将数据发送到不同的主题。 我想知道什么是最佳做法? 如果我的应用程序发送到多个主题(每个主题获得不同的数据)可以/我应该使用单个生成器还是应该创建多个生成器? 什么时候(一般来说)我应该使用多个生产者?

Kafka:如何启用客户端日志记录?

当我实例化Kafka消费者时 KafkaConsumer consumer = new KafkaConsumer(props); 我收到这条消息 SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 如何为客户端程序启用日志记录?

当kafka参与微服务架构时,如何实施合同测试?

我目前正在开发一个项目,我们在微服务架构中实现了kafka。 如果您使用pact-jvm成功创建mS到kafka主题交互的合同测试用例吗? 我的实现是microservice1向REST客户端发布消息,然后REST客户端将消息发布到Kafka主题。 microservice2使用GET方法从Kafka主题中检索消息。

如何获得kafka主题的最新偏移量?

我正在使用Java编写一个kafka消费者。 我想保留消息的实时,所以如果等待消费的消息太多,例如1000或更多,我应该放弃未消耗的消息并开始使用最新的消息。 对于这个问题,我尝试比较最后一个提交的偏移量和一个主题的最新偏移量(只有一个分区),如果这两个偏移量之间的差异大于一定量,我将把主题的最新偏移量设置为下一个偏移,以便我可以放弃那些冗余的消息。 现在我的问题是如何获得一个主题的最新偏移,有人说我可以使用旧的消费者,但它太复杂,新的消费者有这个function吗?

Apache Kafka和Avro:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer

每当我尝试从kafka队列中读取消息时,我都会遇到以下exception: [error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79) at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) 卡夫卡制片人代码: public class AvroSpecificProducer { private static Properties kafkaProps = new Properties(); private static KafkaProducer kafkaProducer; static { kafkaProps.put(“bootstrap.servers”, “localhost:9092”); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); kafkaProps.put(“schema.registry.url”, […]

Kafka Java使用者永远不会收到任何消息

我正在尝试设置一个基本的Java使用者来接收来自Kafka主题的消息。 我已经按照以下示例访问了 – https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example – 并且拥有以下代码: package org.example.kafka.client; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaClientMain { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public KafkaClientMain(String a_zookeeper, String a_groupId, String a_topic) { this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic […]

Kafka 0.10 Java客户端TimeoutException:包含1条记录的批处理已过期

我有一个单节点,多(3)个代理Zookeeper / Kafka设置。 我正在使用Kafka 0.10 Java客户端。 我写了以下简单的远程(在与Kafka不同的服务器上)生产者(在代码中我用MYIP替换了我的公共IP地址): Properties config = new Properties(); try { config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “MYIP:9092, MYIP:9093, MYIP:9094”); config.put(ProducerConfig.ACKS_CONFIG, “all”); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.ByteArraySerializer”); producer = new KafkaProducer(config); Schema.Parser parser = new Schema.Parser(); schema = parser.parse(GATEWAY_SCHEMA); recordInjection = GenericAvroCodecs.toBinary(schema); GenericData.Record avroRecord = new GenericData.Record(schema); //Filling in avroRecord (code not here) byte[] bytes = […]

卡夫卡:不能创建多个流消费者

我刚刚起步并运行Kafka 0.8 beta 1.我有一个非常简单的示例启动和运行,问题是,我只能让一个消息消费者工作,而不是几个。 也就是说,runSingleWorker()方法工作。 run()方法不起作用: import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.consumer.ConsumerConfig; import kafka.javaapi.consumer.ConsumerConnector; import java.util.Map; import java.util.List; import java.util.HashMap; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import com.truecar.inventory.worker.core.application.config.AppConfig; public class ConsumerThreadPool { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); public ConsumerThreadPool(String topic) […]

KafkaAvroDeserializer不返回SpecificRecord但返回GenericRecord

我的KafkaProducer能够使用KafkaAvroSerializer将对象序列化为我的主题。 但是, KafkaConsumer.poll()返回反序列化的GenericRecord而不是我的序列化类。 MyKafkaProducer KafkaProducer producer; try (InputStream props = Resources.getResource(“producer.props”).openStream()) { Properties properties = new Properties(); properties.load(props); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); properties.put(“schema.registry.url”, “http://localhost:8081”); MyBean bean = new MyBean(); producer = new KafkaProducer(properties); producer.send(new ProducerRecord(topic, bean.getId(), bean)); 我的KafkaConsumer try (InputStream props = Resources.getResource(“consumer.props”).openStream()) { properties.load(props); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class); properties.put(“schema.registry.url”, “http://localhost:8081”); consumer = new KafkaConsumer(properties); […]

我在哪里可以找到kafka的maven存储库?

我想尝试kafka 0.8(据我所知它已经发布)。 但是我在哪里可以找到kafka maven存储库。 我应该添加哪些额外的存储库URL? 我找到了一些博客 org.apache.kafka kafka_2.8.0 0.8.0-SHA 但它不起作用。 我正在寻找合适的maven依赖。 或者我应该从git中检出它并部署在我们的内部神器中?