Tag: apache kafka

Spark Kafka流媒体问题

我正在使用maven 我添加了以下依赖项 org.apache.spark spark-streaming_2.10 1.1.0 org.apache.spark spark-streaming-kafka_2.10 1.1.0 我还在代码中添加了jar SparkConf sparkConf = new SparkConf().setAppName(“KafkaSparkTest”); JavaSparkContext sc = new JavaSparkContext(sparkConf); sc.addJar(“/home/test/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.0.2/spark-streaming-kafka_2.10-1.0.2.jar”); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000)); 它可以很好地解决任何错误,当我通过spark-submit运行时,我收到以下错误,非常感谢任何帮助。 谢谢你的时间。 bin/spark-submit –class “KafkaSparkStreaming” –master local[4] try/simple-project/target/simple-project-1.0.jar 线程“main”中的exceptionjava.lang.NoClassDefFoundError:org / apache / spark / streaming / kafka / KafkaUtils位于KafkaSparkStreaming.StarkStreamingTest(KafkaSparkStreaming.java:40),位于sun.reflect的KafkaSparkStreaming.main(KafkaSparkStreaming.java:23)。 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)中的NativeMethodAccessorImpl.invoke0(Native Method)位于sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)的java.lang.reflect.Method.invoke(方法。 java:606)org.apache.spark.deploy.SparkSubmit $ .launch(SparkSubmit.scala:303)atg.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:55)at org.apache.spark .deploy.SparkSubmit.main(SparkSubmit.scala)引起:java.lang.ClassNotFoundException:java.net.URLClassLoader […]

如何为Kafka设置Java选项?

我一直在尝试使用Kafka,并从主站点上的文档中看到,您可以为jvm设置不同的选项,例如堆大小和它使用的垃圾收集器: http://kafka.apache.org/documentation.html#java 然而,它没有说的是如何/在何处设置这些选项。 该应用程序附带一个/ config目录,其中包含许多用于配置目的但没有用于Java的文件。 它还带有一个/ bin目录,其中包含一堆用于Kafka的脚本,但同样没有真正说明如何配置Java。 所以我的问题是,如何配置Kafka使用的Java选项? 是通过文件完成还是有不同的方式?

Kafka – 使用高级消费者实现延迟队列

想要使用高级消费者api实现延迟消费者 大意: 按密钥生成消息(每个消息包含创建时间戳)这可确保每个分区按生产时间排序消息。 auto.commit.enable = false(将在每个消息进程后显式提交) 消费一条消息 检查消息时间戳并检查是否已经过了足够的时间 进程消息(此操作永远不会失败) 提交1个偏移量 while (it.hasNext()) { val msg = it.next().message() //checks timestamp in msg to see delay period exceeded while (!delayedPeriodPassed(msg)) { waitSomeTime() //Thread.sleep or something…. } //certain that the msg was delayed and can now be handled Try { process(msg) } //the msg process will never fail […]

如何从Java中获取kafka服务器的主题列表

我正在使用kafka 0.8版本,非常新的。 我想知道在kafka server创建的主题列表及其元数据。 有没有API可以找到这个? 基本上,我需要编写一个Java消费者,它应该自动发现kafka server任何主题。有API来获取TopicMetadata ,但这需要topic的名称作为输入参数。我需要服务器中存在的所有主题的信息。

Kafka 0.8.2.2 – 无法发布消息

我们编写了一个用于向kafka发布消息的java客户端。 代码如下所示 Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “202.xx.xx.xxx:9092”); props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG,Integer.toString(5 * 1000)); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); //1. create KafkaProducer KafkaProducer producer = new KafkaProducer(props); //2 create callback Callback callback = new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { System.out.println(“Error while sending data”); if (e != null); e.printStackTrace(); } }; producer.send(record, callback); 当我们执行此代码时,我们会收到以下消息和exception ProducerConfig values: compression.type […]

Libat上的UnsatisfiedLinkError在使用Kafka Streams进行开发时会破坏DB dll

我正在开发Windows机器上编写Kafka Streams应用程序。 如果我尝试使用Kafka Streams的leftJoin和branchfunction,我在执行jar应用程序时会收到以下错误: Exception in thread “StreamThread-1” java.lang.UnsatisfiedLinkError: C:\Users\user\AppData\Local\Temp\librocksdbjni325337723194862275.dll: Can’t find dependent libraries at java.lang.ClassLoader$NativeLibrary.load(Native Method) at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) at java.lang.Runtime.load0(Runtime.java:809) at java.lang.System.load(System.java:1086) at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78) at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56) at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64) at org.rocksdb.RocksDB.(RocksDB.java:35) at org.rocksdb.Options.(Options.java:22) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115) at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:38) at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:75) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:72) at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54) at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101) at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) […]

如何在kafka中创建自定义序列化程序?

只有很少的序列化器可用,如, org.apache.kafka.common.serialization.StringSerializer org.apache.kafka.common.serialization.StringSerializer 我们如何创建自己的自定义序列化程序?

Kafka KStreams – 处理超时

我试图使用带有TimeWindows.of(“name”, 30000) .process()批量处理一些KTable值并发送它们。 似乎30秒超过了消费者超时间隔,之后Kafka认为该消费者已经解散并释放分区。 我已经尝试提高轮询频率和提交间隔以避免这种情况: config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, “5000”); config.put(StreamsConfig.POLL_MS_CONFIG, “5000”); 不幸的是,这些错误仍在发生: (很多这些) ERROR oakspinternals.RecordCollector – Error sending record to topic kafka_test1-write_aggregate2-changelog org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0 其次是: INFO oakcciAbstractCoordinator – Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1 WARN oakspinternals.StreamThread – […]

如何在java程序中获取kafka消耗滞后

我写了一个java程序来消耗来自kafka的消息。 我想监视消耗滞后,如何通过java获取它? 顺便说一句,我使用: org.apache.kafka kafka_2.11 0.10.1.1 提前致谢。

Eclipse scala.object无法解析

我正在尝试使用Eclipse在Java中编写Kafka生产者和消费者代码。 我已经下载了Kafka jar文件并作为外部Jar文件加载。 它解决了依赖问题。 但是,始终存在未解决的错误,并且消息如下所示: Multiple markers at this line – The type scala.Product cannot be resolved. It is indirectly referenced from required .class files – The type scala.Serializable cannot be resolved. It is indirectly referenced from required .class files 我真的不知道发生了什么以及如何解决这个错误。 谢谢。