Tag: apache kafka

如何在kafka 0.9.0中使用multithreading消费者?

卡夫卡的文件给出了以下描述的方法: 每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者>实例。 我的代码: public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final CloudKafkaConsumer consumer; private final String topicName; public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) { this.consumer = consumer; this.topicName = topicName; } @Override public void run() { try { this.consumer.subscribe(topicName); ConsumerRecords records; while (!closed.get()) { synchronized (consumer) { records = […]

如何在Scala中为Kafka(带分区的commitSync)公开Java方法?

我试图通过Scala公开Java方法(有关原始java方法的更多细节 – 它来自Kafka ) 这是原始的Java方法: public void commitSync(Map offsets) 如何向Scala中的方法公开和传递参数? 我有类似的东西: def commitSync() = { consumer.commitSync(…) } 谢谢。

带解码器问题的Kafka Avro Consumer

当我尝试使用我的相应模式使用Avro运行Kafka Consumer时 ,它返回错误“AvroRuntimeException:格式错误的数据。长度为负:-40”。 我看到其他人有类似的问题,将字节数组转换为json , Avro写入和读取 ,以及Kafka Avro Binary *编码器 。 我也引用了这个消费者组示例 ,它们都很有帮助,但到目前为止这个错误没有任何帮助..它可以工作到这部分代码(第73行) 解码器解码器= DecoderFactory.get()。binaryDecoder(byteArrayInputStream,null); 我已经尝试了其他解码器并打印出byteArrayInputStream变量的内容,看起来我相信你会期望序列化的avro数据看起来(在消息中我可以看到模式和一些数据以及一些格式错误的数据)我打印出来了使用.available()方法可用的字节,返回594.我无法理解为什么会发生此错误。 Apache Nifi用于生成具有来自hdfs的相同模式的Kafka流。 我将不胜感激任何帮助。

是否可以在Kafka 0.8.2中为现有主题添加分区

我有一个运行2个分区的Kafka集群。 我一直在寻找一种方法将分区数增加到3.但是,我不想丢失主题中的现有消息。 我尝试停止Kafka,修改server.properties文件以将分区数增加到3并重新启动Kafka。 但是,这似乎没有任何改变。 使用Kafka ConsumerOffsetChecker ,我仍然看到它只使用了2个分区。 我使用的Kafka版本是0.8.2.2。 在0.8.1版本中,曾经有一个名为kafka-add-partitions.sh的脚本,我想这可能会成功。 但是,我在0.8.2中没有看到任何这样的脚本。 有没有办法实现这个? 我确实尝试创建一个全新的主题,对于那个主题,它似乎根据server.properties文件中的更改使用了3个分区。 但是,对于现有主题,它似乎并不关心。

无法读取工件描述符:IntelliJ

我遇到了我的Maven POM文件的问题,它无法找到火花依赖并且返回错误:无法读取org.apache.spark的工件描述符:spark-streaming-kafka_2.10:jar:1.2.1 我已经确认它不是任何公司防火墙的问题,因为所有其他依赖项都正确加载,只是这个。 我也能够在我的maven设置中确认它正试图从以下回购中获取。 我尝试删除本地计算机上的.m2 repo以重新加载它,仍然没有骰子。 http://repo.maven.apache.org/maven2/org/apache/spark/spark-streaming-kafka_2.10/1.2.1/ 下面是我的pom文件 my.group.id sentiment 1.0-SNAPSHOT NPITWITTER com.sparkjava spark-core 1.1.1 org.apache.spark spark-streaming-kafka_2.10 1.2.1 org.apache.spark spark-core_2.10 1.2.1 org.apache.spark spark-streaming_2.10 1.2.1 org.apache.spark spark-hive_2.10 1.2.1 org.apache.spark spark-sql_2.10 1.2.1

从本地计算机连接到Docker中运行的Kafka

我在本地计算机上使用docker设置单节点基本Kafka部署,如Confluent Kafka文档中所述 (步骤2-3)。 另外,我还暴露了zookeeper的端口2181和kafka的端口9092,这样我就能从本地机器上运行的java客户端连接到它们: $ docker run -d \ -p 2181:2181 \ –net=confluent \ –name=zookeeper \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:4.1.0 $ docker run -d \ –net=confluent \ –name=kafka \ -p 9092:9092 \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ confluentinc/cp-kafka:4.1.0 问题:当我尝试从主机连接到kafka时,连接失败,因为它无法解析地址:kafka:9092。 这是我的Java代码: Properties props = new Properties(); props.put(“bootstrap.servers”, “localhost:9092”); props.put(“client.id”, “KafkaExampleProducer”); props.put(“key.serializer”, LongSerializer.class.getName()); […]

将Kafka输入流动态连接到多个输出流

Kafka Streams内置了哪些function,允许将单个输入流动态连接到多个输出流? KStream.branch允许基于true / false谓词进行分支,但这不是我想要的。 我希望每个传入的日志确定它将在运行时流式传输的主题,例如,日志{“date”: “2017-01-01”}将流式传输到主题topic-2017-01-01和日志{“date”: “2017-01-02”}将流式传输到主题topic-2017-01-02 。 我可以在流上调用forEach ,然后写给Kafka制作人,但这看起来并不优雅。 在Streams框架中有更好的方法吗?

Kafka:编写自定义序列化程序

我正在尝试用Kafka 0.8.1建立一个POC。 我使用自己的java类作为Kafka消息,它有一堆String数据类型。 我不能使用默认的序列化程序类或Kafka库附带的String序列化程序类。 我想我需要编写自己的序列化程序并将其提供给生产者属性。 如果您知道在Kafka中编写示例自定义序列化程序(在java中),请分享。 非常感谢,非常感谢。