从0.7升级到0.8.1.1后生成嵌入式kafka队列时出错

我找不到任何可以直接处理我所遇到的问题的东西,所以我在这里发帖。 我有JUnit / JBehave测试,它们启动了嵌入式ZooKeeper服务器,嵌入式Kafka服务器以及kafka生产者和消费者。

将kafka从0.7升级到0.8.1.1后,我遇到以下类型的错误:

ERROR [kafka-request-handler-5] state.change.logger - Error on broker 1 while processing LeaderAndIsr request correlationId 7 received from controller 1 epoch 1 for partition [topicName,8] java.lang.NullPointerException: null at kafka.log.Log.(Log.scala:60) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.log.LogManager.createLog(LogManager.scala:265) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:90) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.cluster.Partition$$anonfun$makeLeader$2.apply(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.cluster.Partition$$anonfun$makeLeader$2.apply(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na] at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) ~[scala-library-2.10.4.jar:na] at kafka.cluster.Partition.makeLeader(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:305) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:304) ~[kafka_2.10-0.8.1.1.jar:na] at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) ~[scala-library-2.10.4.jar:na] at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:304) [kafka_2.10-0.8.1.1.jar:na] at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:258) [kafka_2.10-0.8.1.1.jar:na] at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:217) [kafka_2.10-0.8.1.1.jar:na] at kafka.server.KafkaApis.handle(KafkaApis.scala:189) [kafka_2.10-0.8.1.1.jar:na] at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) [kafka_2.10-0.8.1.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25] 

 WARN [threadName] kcConsumerFetcherManager$LeaderFinderThread - [threadName], Failed to add leader for partitions [topicName,9],[topicName,3],[topicName,0],[topicName,8],[topicName,5],[topicName,1],[topicName,6],[topicName,2],[topicName,7],[topicName,4]; will retry kafka.common.NotLeaderForPartitionException: null at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_25] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_25] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_25] at java.lang.reflect.Constructor.newInstance(Constructor.java:408) ~[na:1.8.0_25] at java.lang.Class.newInstance(Class.java:438) ~[na:1.8.0_25] at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174) ~[kafka_2.10-0.8.1.1.jar:na] at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[scala-library-2.10.4.jar:na] at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) ~[scala-library-2.10.4.jar:na] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[scala-library-2.10.4.jar:na] at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76) ~[kafka_2.10-0.8.1.1.jar:na] at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[scala-library-2.10.4.jar:na] at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[scala-library-2.10.4.jar:na] at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) ~[kafka_2.10-0.8.1.1.jar:na] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [kafka_2.10-0.8.1.1.jar:na] 

 02/03 10:26:34.655 WARN [kafka-request-handler-7] kafka.server.KafkaApis - [KafkaApi-1] Offset request with correlation id 0 from client clientName on partition [topicName,5] failed due to Leader not local for partition [topicName,5] on broker 1 

事实certificate,这与新KafkaServer构造函数中的Time参数有关。

我传入kafka.utils.Time对象的null参数:

 private KafkaServer server = new KafkaServer(config, null); 

相反,您需要创建kafka.utils.Time接口的实现,并传入一个新的实例:

 private KafkaServer server = new KafkaServer(config, new SystemTime()); private static class SystemTime implements Time { @Override public long milliseconds() { return System.currentTimeMillis(); } @Override public long nanoseconds() { return System.nanoTime(); } @Override public void sleep(long arg0) { try { Thread.sleep(arg0); } catch (InterruptedException e) { log.error("Kafka systemtime interrupted",e); } } }