Tag: apache kafka

KafkaProducer未成功将消息发送到队列中

我在Windows PC上构建了一个小测试环境,并记下以下代码来测试kafka(使用org.apache.kafka中的kafka_2.10:0.9.0.1)。 package iii.functiontesting; import java.text.ParseException; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; /** * Hello world! * */ public class test4 { public static void main( String[] args ) throws ParseException { Properties producerProps=new Properties(); producerProps.put(“bootstrap.servers”, “localhost:9092”); producerProps.put(“serializer.class”,org.apache.kafka.common.serialization.StringSerializer.class.getName()); producerProps.put(“key.serializer”,org.apache.kafka.common.serialization.StringSerializer.class.getName()); producerProps.put(“value.serializer”,org.apache.kafka.common.serialization.StringSerializer.class.getName()); producerProps.put(“request.required.acks”,”1″); KafkaProducer kafkawriter= new KafkaProducer(producerProps); ProducerRecord msg=new ProducerRecord(“TEST3″,”ImKey”,”teststring1″); kafkawriter.send(msg); } } 我使用以下命令检查消息是否正确写入队列 D:\ Work \ […]

java Kafka生产者错误

我做了kafka java生产者。 但控制台说错误。 kafka服务器在aws上。 和制作人在我的Mac上。 然而kara服务器是可以访问的。 当我从制作人发送消息时,kafka服务器显示“Accepted connection ..”。 有什么问题? 1 [main] INFO kafka.utils.VerifiableProperties – Verifying properties 28 [main] INFO kafka.utils.VerifiableProperties – Property metadata.broker.list is overridden to xxxxxx:9092 28 [main] INFO kafka.utils.VerifiableProperties – Property serializer.class is overridden to kafka.serializer.StringEncoder 137 [main] INFO kafka.client.ClientUtils$ – Fetching metadata from broker id:0,host: xxxxxx,port:9092 with correlation id 0 for […]

卡夫卡制作人类未找到例外

我尝试使用kafka实现一个简单的生成器使用者示例,并使用以下属性实现: Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,”localhost:” + portNumber); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,”org.apache.kafka.common.serialization.ByteArraySerializer”); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,”org.apache.kafka.common.serialization.StringSerializer”); // Belirtilen property ayarlarına sahip kafka producer oluşturulur org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); 然而,当我尝试完全相同的配置,以及其他一切相同,在另一个项目,这是一个数据可视化软件的插件,我得到这个错误: …. // Here there is some other stuff but I thing the important one is the below one Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/Producer at App.MyControlPanel.(MyControlPanel.java:130) at App.CytoVisProject.(CytoVisProject.java:29) … 96 more Caused […]

Apache Kafka:无法更新Metadata / java.nio.channels.ClosedChannelException

我刚刚开始使用Apache Kafka / Zookeeper,并且在尝试在AWS上设置集群时遇到了问题。 目前我有三台服务器: 一个正在运行的Zookeeper和两个正在运行的Kafka。 我可以毫无问题地启动Kafka服务器,并可以在这两个服务器上创建主题。 然而,当我尝试在一台机器上启动一个生产者而在另一台机器上启动一个消费者时遇到了麻烦: 在卡夫卡制片人: kafka-console-producer.sh –broker-list :9092,:9092 –topic samsa 关于卡夫卡消费者: kafka-console-consumer.sh –zookeeper :2181 –topic samsa 我在制作人(“hi”)上输入一条消息,暂时没有任何反应。 然后我收到这条消息: ERROR Error when sending message to topic samsa with key: null, value: 2 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 在消费者方面,我收到此消息,该消息会定期重复: WARN Fetching topic metadata with correlation id # […]

Kafka消费者(0.8.2.2)可以批量阅读消息吗?

根据我的理解,Kafka消费者按顺序从指定的分区读取消息… 我们计划拥有多个Kafka使用者(Java),它们具有相同的组I ..如果它从指定的分区顺序读取,那么我们如何实现高吞吐量..例如,生产者发布消息,如每秒40个。消费者流程每秒消息1 ..虽然我们可以有多个消费者,但不能有40 rt ??? 如我错了请纠正我… 在我们的情况下,消费者只有在消息成功处理后才能提交偏移..这些消息将被重新处理…有没有更好的解决方案???

Kafka消费者配置/性能问题

我正在尝试将kafka作为AWS SQS的替代品。 动机主要是提高性能,其中kafka将消除限制,一次性提取10条消息,上限为256kb。 这是我的用例的高级场景。 我有一堆爬虫正在发送索引文件。 有效载荷的大小平均约为1mb。 爬虫调用SOAP端点,后者又运行生产者代码以将消息提交给kafka队列。 消费者应用程序获取消息并处理它们。 对于我的测试框,我已经为主题配置了30个分区和2个复制。 两个kafka实例正在运行1个zookeeper实例。 卡夫卡版本是0.10.0。 对于我的测试,我在队列中发布了700万条消息。 我创建了一个包含30个消费者线程的消费者组,每个分区一个。 我最初的印象是,与通过SQS获得的相比,这将大大加快处理能力。 不幸的是,事实并非如此。 就我而言,数据处理很复杂,平均需要1-2分钟才能完成。这导致了一系列的分区重新平衡,因为线程无法按时心跳。 我可以在日志引用中看到一堆消息 组full_group的自动偏移提交失败:由于组已经重新平衡并将分区分配给另一个成员,因此无法完成提交。 这意味着后续调用poll()之间的时间比配置的session.timeout.ms长,这通常意味着轮询循环花费了太多时间进行消息处理。 您可以通过增加会话超时或通过max.poll.records减少poll()中返回的批量的最大大小来解决此问题。 这导致多次处理相同的消息。 我尝试使用会话超时,max.poll.records和轮询时间来避免这种情况,但这会减慢整个处理时间。 这是一些配置参数。 metadata.max.age.ms = 300000 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] enable.auto.commit = true max.poll.records = 10000 request.timeout.ms = 310000 heartbeat.interval.ms = 100000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 fetch.min.bytes = 1 send.buffer.bytes […]

如何使用Java中的Structured Streaming从Kafka反序列化记录?

我使用Spark 2.1 。 我试图使用Spark Structured Streaming从Kafka读取记录,反序列化它们并在之后应用聚合。 我有以下代码: SparkSession spark = SparkSession .builder() .appName(“Statistics”) .getOrCreate(); Dataset df = spark .readStream() .format(“kafka”) .option(“kafka.bootstrap.servers”, kafkaUri) .option(“subscribe”, “Statistics”) .option(“startingOffsets”, “earliest”) .load(); df.selectExpr(“CAST(value AS STRING)”) 我想要的是将value字段反序列化为我的对象而不是作为String 。 我有一个自定义反序列化器。 public StatisticsRecord deserialize(String s, byte[] bytes) 我怎么能用Java做到这一点? 我找到的唯一相关链接是https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2 .html ,但这是针对Scala的。

为什么我看不到Kafka Streams reduce方法的任何输出?

给出以下代码: KStream stream = builder.stream(Serdes.String(), customSerde, “test_in”); stream .groupByKey(Serdes.String(), customSerde) .reduce(new CustomReducer(), “reduction_state”) .print(Serdes.String(), customSerde); 我在Reducer的apply方法中有一个println语句,当我希望减少时会成功打印出来。 但是,上面显示的最终打印语句不显示任何内容。 同样,如果我使用方法而不是print ,我在目标主题中看不到任何消息。 在reduce语句之后我需要什么来查看减少的结果? 如果一个值被推送到输入,我不希望看到任何东西。 如果按下具有相同键的第二个值,我希望减少器应用(它确实如此),并且我还期望减少的结果继续到处理管道中的下一步。 如上所述,我在管道的后续步骤中没有看到任何内容,我不明白为什么。

如何在Spark中将JavaPairInputDStream转换为DataSet / DataFrame

我正在尝试从kafka接收流数据。 在此过程中,我能够接收流数据并将其存储到JavaPairInputDStream中 。 现在我需要分析这些数据,而不是将其存储到任何数据库中。所以我想将此JavaPairInputDStream转换为DataSet或DataFrame 到目前为止我尝试的是: import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalog.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.AbstractJavaDStreamLike; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; //Streaming Working […]

Storm KafkaSpout停止使用来自Kafka Topic的消息

我的问题是Storm KafkaSpout在一段时间后停止使用来自Kafka主题的消息。 在storm中启用调试时,我得到如下日志文件: 2016-07-05 03:58:26.097 oasdtask [INFO] Emitting:packet_spout __metrics [#object [org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo 0x2c35b34f“org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo @ 2c35b34f“] [#object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x798f1e35”[__ack-count = {default = 0}]“] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x230867ec“[__sendqueue = {sojourn_time_ms = 0.0,write_pos = 5411461,read_pos = 5411461,overflow = 0,arrival_rate_secs = 0.0,capacity = 1024,population = 0}]”] #object [org.apache.storm.metric。 api.IMetricsConsumer $ DataPoint 0x7cdec8eb“[__ complete-latency […]