Tag: apache kafka

从0.7升级到0.8.1.1后生成嵌入式kafka队列时出错

我找不到任何可以直接处理我所遇到的问题的东西,所以我在这里发帖。 我有JUnit / JBehave测试,它们启动了嵌入式ZooKeeper服务器,嵌入式Kafka服务器以及kafka生产者和消费者。 将kafka从0.7升级到0.8.1.1后,我遇到以下类型的错误: ERROR [kafka-request-handler-5] state.change.logger – Error on broker 1 while processing LeaderAndIsr request correlationId 7 received from controller 1 epoch 1 for partition [topicName,8] java.lang.NullPointerException: null at kafka.log.Log.(Log.scala:60) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.log.LogManager.createLog(LogManager.scala:265) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:90) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.cluster.Partition$$anonfun$makeLeader$2.apply(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.cluster.Partition$$anonfun$makeLeader$2.apply(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na] at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) ~[scala-library-2.10.4.jar:na] at kafka.cluster.Partition.makeLeader(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:305) ~[kafka_2.10-0.8.1.1.jar:na] at […]

Kafka Log4j appender没有发送消息

我是一个新的ot apache Kafka和log4j。 我正在尝试将我的日志消息发送到Kafka。 这是我的log4j属性文件 log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% %m%n log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender log4j.appender.KAFKA.BrokerList=localhost:9092 log4j.appender.KAFKA.Topic=kfkLogs log4j.appender.KAFKA.SerializerClass=kafka.producer.DefaultStringEncoder log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout log4j.appender.KAFKA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% – %m%n log4j.logger.logGen=DEBUG, KAFKA 但是,我无法在我的消费者中收到任何消息。 我用其他一些生产者代码测试了消费者,它运行正常。 另外,我收到了这个警告 log4j:WARN No such property [serializerClass] in kafka.producer.KafkaLog4jAppender. 编辑 这是生成我的日志消息的代码 package logGen; import org.apache.log4j.Logger; public class TestLog4j { static Logger log = Logger.getLogger(TestLog4j.class.getName()); public […]

带有transactionIdPrefix的DefaultKafkaProducerFactory会在引导服务器关闭时等待

HY, 我正在使用spring-kafka 1.3.0.RELEASE创建一个事务生成器。 当引导服务器关闭时,DefaultKafkaProducerFactory会无休止地等待,直到引导服务器启动。 我究竟做错了什么 ? 我可以设置超时和/或其他类似属性吗? 这是我重现场景的代码示例: public static void main(String[] args) { final DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigs()); producerFactory.setTransactionIdPrefix(“transactionIdPrefix”); final Producer producer = producerFactory.createProducer(); System.out.println(“Created producer:” + producer); } private static Map producerConfigs() { final Map props = new HashMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.56.1:9092”); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); props.put(ProducerConfig.RETRIES_CONFIG, 1); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, […]

Kafka – 经纪人:集团协调员不可用

我有以下结构: zookeeper: 3.4.12 kafka: kafka_2.11-1.1.0 server1: zookeeper + kafka server2: zookeeper + kafka server3: zookeeper + kafka 使用kafka-topics shell脚本创建具有复制因子3和分区3的主题。 ./kafka-topics.sh –create –zookeeper localhost:2181 –topic test-flow –partitions 3 –replication-factor 3 并使用group localConsumers。 当领导没事的时候它工作正常。 ./kafka-topics.sh –describe –zookeeper localhost:2181 –topic test-flow Topic:test-flow PartitionCount:3 ReplicationFactor:3 Configs: Topic: test-flow Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: test-flow Partition: […]

kafka使用者动态检测添加的主题

我正在使用KafkaConsumer来消费来自Kafka服务器(主题)的消息。 它适用于在启动消费者代码之前创建的主题… 但问题是,如果动态创建的主题(我的意思是说消费者代码开始之后),它将无法工作,但API表示它将支持动态主题创建..这是您的参考链接.. 使用的Kafka版本:0.9.0.1 https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 这是JAVA代码…… Properties props = new Properties(); props.put(“bootstrap.servers”, “localhost:9092”); props.put(“group.id”, “test”); props.put(“enable.auto.commit”, “false”); props.put(“auto.commit.interval.ms”, “1000”); props.put(“session.timeout.ms”, “30000”); props.put(“key.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”); props.put(“value.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”); KafkaConsumer consumer = new KafkaConsumer(props); Pattern r = Pattern.compile(“siddu(\\d)*”); consumer.subscribe(r, new HandleRebalance()); try { while(true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord> partitionRecords = records.records(partition); for (ConsumerRecord […]

在Kafka Streams中反序列化POJO

我的Kafka主题包含此格式的消息 user1,subject1,80|user1,subject2,90 user2,subject1,70|user2,subject2,100 and so on. 我创建了用户POJO如下。 class User implements Serializable{ /** * */ private static final long serialVersionUID = -253687203767610477L; private String userId; private String subject; private String marks; public User(String userId, String subject, String marks) { super(); this.userId = userId; this.subject = subject; this.marks = marks; } public String getUserId() { return userId; […]

Kafka制作人发送无效字符

使用以下代码,我发送Elasticsearch文档以进行索引。 我尝试将基本对象转换为JSON并通过制作人发送。 但是,每条消息(从控制台检查)都附加了像 – t {“productId”:2455这样的乱码字符 public boolean sendMessage() { PageRequest page = new PageRequest(0, 1); Product p = product.findByName(“Cream”, page).getContent().get(0); String json = “”; ObjectMapper mapper = new ObjectMapper(); try { json = mapper.writeValueAsString(p); } catch (JsonProcessingException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } logger.info(“JSON = ” + json); boolean status = […]

Kafka Streams:错误退出的正确方法

我已经成功地获得了一个使用,转换和生成数据的流应用程序,但我注意到,流处理器会定期转换到ERROR状态,并且该进程将在不退出的情况下坐在那里。 显示我的日志: All stream threads have died. The instance will be in error state and should be closed. 有没有办法告诉Streams应用程序一旦达到ERROR状态就退出? 也许是各种监视器线程? 我看到Kafka Streams代码的注释中的引用给需要在应用程序达到此状态时关闭应用程序的用户,但是,我无法在文档中找到提及此任务的内容。 有一个简单的方法来执行此关闭步骤吗? 可能是错误的方式可能关闭错误 我的目的是在KafkaStreams对象上设置UncaughtExceptionHandler方法,以执行以下操作: 记录错误 使用原始KafkaStreams对象上的close方法关闭流 结果是: 记录exception消息 INFO org.apache.kafka.streams.KafkaStreams … State transition from ERROR to PENDING_SHUTDOWN INFO org.apache.kafka.streams.processor.internals.StreamThread … Informed to shut down 然后,不幸的是,这个过程似乎没有退出。 FWIW我觉得这可能是对setUncaughtExceptionHandler的误用

卡夫卡模式订阅。 新主题没有触发重新平衡

根据关于kafka javadocs的文档,如果我: 订阅模式 创建与模式匹配的主题 应该发生重新平衡,这使得消费者从该新主题中读取。 但那并没有发生。 如果我停止并启动消费者,它确实会选择新主题。 所以我知道新主题与模式匹配。 在https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics中可能存在此问题的重复,但这个问题无处可去。 我看到kafka日志并没有错误,它只是不会触发重新平衡。 当消费者加入或死亡时触发重新平衡,但是在创建新主题时不会触发(即使将分区添加到现有主题,但这是另一个主题)。 我正在使用kafka 0.10.0.0和“新消费者API”的官方Java客户端,意思是代理GroupCoordinator而不是胖客户端+ zookeeper。 这是示例消费者的代码: public class SampleConsumer { public static void main(String[] args) throws IOException { KafkaConsumer consumer; try (InputStream props = Resources.getResource(“consumer.props”).openStream()) { Properties properties = new Properties(); properties.load(props); properties.setProperty(“group.id”, “my-group”); System.out.println(properties.get(“group.id”)); consumer = new KafkaConsumer(properties); } Pattern pattern = Pattern.compile(“mytopic.+”); consumer.subscribe(pattern, new […]

Apache Storm Trident和Kafka Spout Integration

我无法找到正确整合Kafka与Apache Storm Trident的良好文档。 我试着在这里查看以前发布的相关问题,但没有足够的信息。 我想将Trident与Kafka连接为OpaqueTridentKafkaSpout。 以下是目前正在运行的示例代码 GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(properties.getProperty(“topic”, “mytopic”)); Broker brokerForPartition0 = new Broker(“IP1”,9092); Broker brokerForPartition1 = new Broker(“IP2”, 9092); Broker brokerForPartition2 = new Broker(“IP3:9092”); globalPartitionInformation.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0 globalPartitionInformation.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1 globalPartitionInformation.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2 StaticHosts staticHosts = new StaticHosts(globalPartitionInformation); TridentKafkaConfig […]