Kafka快速入门:我需要什么依赖?

我正在通过kafka快速入门:

http://kafka.apache.org/07/quickstart.html

和基本的消费者群体示例:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

我已经编写了Consumer和ConsumerThreadPool,如上所示:

import kafka.consumer.KafkaStream; import kafka.consumer.ConsumerIterator; public class Consumer implements Runnable { private KafkaStream m_stream; private Integer m_threadNumber; public Consumer(KafkaStream a_stream, Integer a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIterator it = m_stream.iterator(); while (it.hasNext()) { System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); } System.out.println("Shutting down Thread: " + m_threadNumber); } } 

其他几个方面:我使用spring来管理我的zookeeper:

 import javax.inject.Named; import java.util.Properties; import kafka.consumer.ConsumerConfig; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @Configuration @ComponentScan("com.truecar.inventory.worker.core") public class AppConfig { @Bean @Named("consumerConfig") private static ConsumerConfig createConsumerConfig() { String zookeeperAddress = "127.0.0.1:2181"; String groupId = "inventory"; Properties props = new Properties(); props.put("zookeeper.connect", zookeeperAddress); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } } 

我正在使用Maven和OneJar Maven插件进行编译。 但是,我编译然后运行生成的一个jar我得到以下错误:

 Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.simontuffs.onejar.Boot.run(Boot.java:340) at com.simontuffs.onejar.Boot.main(Boot.java:166) Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:792) at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803) at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2521) at java.lang.Class.getDeclaredMethods(Class.java:1845) at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180) at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222) at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165) at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140) at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282) at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223) at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461) at org.springframework.context.annotation.AnnotationConfigApplicationContext.(AnnotationConfigApplicationContext.java:73) at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.(ConsumerThreadPool.java:31) at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20) ... 6 more Caused by: java.lang.ClassNotFoundException: scala.ScalaObject at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 27 more 

现在,我对Kafka知之甚少,对Scala一无所知。 我该如何解决? 接下来我该怎么办? 这是一个已知的问题? 我需要其他依赖项吗? 这是我的pom.xml中的kafka版本:

  org.apache.kafka kafka_2.9.2 0.8.0-beta1  

更新:我联系了Kafka dev邮件列表,他们让我知道了scala依赖项的一些特定版本要求。 但是,还有一个未记录的log4j依赖项,这会导致另一个运行时,而不是编译时exception。

 Exception in thread "main" java.lang.reflect.InvocationTargetException Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V at org.apache.log4j.Category.log(Category.java:333) at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177) 

另一个更新:

我发现了正确的log4j依赖:

   log4j log4j 1.2.17  

但现在我遇到了一个更加神秘的运行时exception:

 Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.simontuffs.onejar.Boot.run(Boot.java:340) at com.simontuffs.onejar.Boot.main(Boot.java:166) Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:64) at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:66) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

在这一点上,我得到了WTF的那种感觉。 所以我添加了另一个依赖:

   com.101tec zkclient 0.3  

但是这暴露了另一个运行时exception:

 Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.simontuffs.onejar.Boot.run(Boot.java:340) at com.simontuffs.onejar.Boot.main(Boot.java:166) Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146) at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:113) at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:64) at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:66) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

我希望能够让这个婴儿的例子正常运行,但也许这是使用beta产品的代价? 也许我应该切换到Apache Active MQ。 但这听起来不那么有趣。 我错过了什么吗?

问题是kafka beta的构建方式是使用jar生成的pom无效并且maven无法识别它并正确解析 ,从而获取传递依赖性。 我们已经设法通过在我们的pom定义中从该pom(scala,zk等)中获取所有依赖项来缓解此问题。 我们正在等待kafka的下一个beta版本,其中将解决问题。

完整依赖列表如下。 请注意,您必须相应地更改scala版本依赖关系到kafka工件的后缀。

  org.scala-lang scala-library 2.8.0   log4j log4j 1.2.15   com.sun.jmx jmxri   com.sun.jdmk jmxtools   javax.jms jms     net.sf.jopt-simple jopt-simple 3.2   org.slf4j slf4j-simple 1.6.4   org.scala-lang scala-compiler 2.8.0   com.101tec zkclient 0.3   com.yammer.metrics metrics-core 2.2.0   com.yammer.metrics metrics-annotation 2.2.0   org.easymock easymock 3.0 test   org.scalatest scalatest 1.2 test  

至于

也许我应该切换到Apache Active MQ。 但这听起来不那么有趣。 我错过了什么吗?

那么,你不要忘记这是测试版吗? 确实发生了一些不好的事情,但目前我们正在毫不费力地运行kafka 0.7。

我发现这种依赖配置是有用的:

   org.springframework spring-core 3.2.4.RELEASE   org.springframework spring-context 3.2.4.RELEASE   org.apache.kafka kafka_2.9.2 0.8.0-beta1   javax.inject javax.inject 1   org.scala-lang scala-library 2.9.2   log4j log4j 1.2.17   com.101tec zkclient 0.3   com.yammer.metrics metrics-core 2.2.0   

这似乎有效:

 $ git clone https://github.com/buildlackey/cep $ cd cep/kafka-0.8.x $ mvn package $ mvn exec:java -Dexec.mainClass=TestKafkaProducer 

(通过哪里可以找到kafka的maven存储库? )