Tag: apache kafka

SpringXD和Spring Integration:每隔X分钟从kafka主题中读取一遍,然后发送到另一个主题

我正在尝试实现一个解决方案来创建一个由kafka源,桥接模块和kafka接收器组成的SpringXD流。 所以我有类似的东西: 我的问题是我想以某种方式避免使用轮询器。 基本上是因为当这些消息在队列中时我想避免将消息保留在内存中。 我宁愿每隔X分钟从kafka读取一次,只需从队列中取出Y消息,然后将这些消息发送到下一个主题。 看起来我无法摆脱队列,但后来我的问题是:还有其他选择吗? 我不喜欢把东西留在内存中,但我也不想使用这个选项: http : //docs.spring.io/spring-integration/reference/html/system-management-chapter.html#message -商店

如何暂停卡夫卡消费者?

我在我的框架中使用Kafka生产者 – 消费者模型。 消费者端消耗的记录随后被索引到elasticsearch上。 在这里我有一个用例,如果ES关闭,我将不得不暂停kafka消费者,直到ES启动,一旦启动,我需要恢复消费者并消耗我上次离开的记录。 我不认为这可以用@KafkaListener实现。 有谁能请给我一个解决方案吗? 我想我需要为此编写自己的KafkaListenerContainer,但我无法正确实现它。 任何帮助将非常感激。

如何动态地将主题传递给kafka监听器?

几天后,我正在尝试将主题动态传递给Kafka监听器,而不是通过Java DSL中的密钥使用它们。 周围的人之前做过这个或者可以说明实现这个目标的最佳方法是什么?

Kafka Streams表转换

我在SQL Server中有一个表,我想流式传输给Kafka主题,结构如下: (UserID, ReportID) 该表将不断更改(记录添加,插入,无更新) 我想将其转换为这种结构并放入Elasticsearch: { “UserID”: 1, “Reports”: [1, 2, 3, 4, 5, 6] } 我到目前为止看到的示例是日志或点击流,但在我的情况下不起作用。 这种用例是否可行? 我总是可以看看UserID变化和查询数据库,但这看起来很幼稚而不是最好的方法。 更新 import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.*; import java.util.ArrayList; import java.util.Properties; public class MyDemo { public static void main(String… args) { System.out.println(“Hello KTable!”); final Serde longSerde = […]

使用Java更新kafka中特定主题的TTL

更新主题的TTL ,以便记录在主题中保留10天。 我必须为特定主题执行此操作,只需将所有其他主题TTL保持相同,当前配置,我必须使用java执行此操作,因为我正在通过Java将主题推送到kafka 。 我正在设置以下属性以将主题推送到kafka Properties props = new Properties(); props.put(“bootstrap.servers”, KAFKA_SERVERS); props.put(“acks”, ACKS); props.put(“retries”, RETRIES); props.put(“linger.ms”, new Integer(LINGER_MS)); props.put(“buffer.memory”, new Integer(BUFFER_MEMORY)); props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);

kafka-connect错误:无法找到或加载主类

我正在遵循官方文档来实现 kakf-connect以从文件中读取数据。 我有完美的kafka跑步。 生产者和消费者发送和接收消息。 但是,当我运行以下命令时: sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties 我收到以下错误: 错误:无法找到或加载主类org.apache.kafka.connect.cli.ConnectStandalone 我交叉检查,我有ConnectStandalone文件connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone 。 我的connect-file-source.properties如下: name=local-file-source connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector tasks.max=1 file=test.txt topic=spark-kafka 有什么遗失的吗? 我该怎么做才能摆脱这个错误?

kafka KStream – 采用n秒计数的拓扑

我有一个JSON对象流,我键入一些值的哈希值。 我希望在n秒(10?60?)间隔内按键计数,并使用这些值进行一些模式分析。 我的拓扑: K->aggregateByKey(n seconds)->process() 在这个process – init()步骤Ive调用了ProcessorContent.schedule(60 * 1000L) ,希望调用.punctuate() 。 从这里开始,我将遍历内部哈希中的值并相应地执行操作。 我看到值通过聚合步骤并命中process()函数,但永远不会调用.punctuate() 。 码: KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream opxLines = kStreamBuilder.stream(TOPIC); KStream mapped = opxLines.map(new ReMapper()); KTable<Windowed, String> ktRtDetail = mapped.aggregateByKey( new AggregateInit(), new OpxAggregate(), TimeWindows.of(“opx_aggregate”, 60000)); ktRtDetail.toStream().process(new ProcessorSupplier<Windowed, String>() { @Override public Processor<Windowed, String> get() { return new AggProcessor(); } […]

spring boot kafka consumer – 如何从spring boot中正确使用kafka消息

我正在开发一个弹簧启动应用程序,它假设使用kafka消息。 我有一个奇怪的结果:当我使用kafka-console-producer.sh发送消息时,我的消费者只检测并打印另一条消息。 例如 – 在kafka控制台生产者中,我输入“one” – > Enter – >“two” – > Enter – >“three” – > Enter。 在我的春季靴子消费者中,我只会看到“两个”,“四个”等… 我的ConsumeConfigFactory.java: import java.util.Properties; import javax.annotation.PostConstruct; import kafka.consumer.ConsumerConfig; import org.springframework.stereotype.Component; @Component public class ConsumerConfigFactory { private static final String ZK_CONNECT = “10.211.55.2:2181”; private ConsumerConfig consumerConfig; @PostConstruct private void createConsumerConfig() { Properties props = new Properties(); props.put(“zookeeper.connect”, ZK_CONNECT); […]

连接到Apache Kafka多节点群集中的Zookeeper

我按照以下说明设置了多节点kafka群集。 现在,如何连接到zookeeper? 是否可以从JAVA中的生产者/消费者端连接到一个zookeeper,或者有没有办法连接所有zookeeper节点? 设置多节点Apache ZooKeeper集群 在群集的每个节点上,将以下行添加到文件kafka / config / zookeeper.properties server.1=zNode01:2888:3888 server.2=zNode02:2888:3888 server.3=zNode03:2888:3888 #add here more servers if you want initLimit=5 syncLimit=2 在群集的每个节点上,在dataDir属性表示的文件夹中创建名为myid的文件(默认情况下,文件夹为/ tmp / zookeeper)。 myid文件应该只包含znode的id(zNode01为’1’,ZNode02为’2’等等) 设置多代理Apache Kafka集群 在集群的每个节点上,修改文件kafka / config / server.properties中的属性zookeeper.connect: zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181 在集群的每个节点上,从文件kafka / config / server.properties修改属性host.name:host.name = zNode0x 在集群的每个节点上,从文件kafka / config / server.properties修改属性broker.id(集群中的每个代理都应具有唯一的ID)

Kafka – 如何在Producer类中获取失败的消息详细信息

Kafka允许通过Producer(KafkaProducer)类的以下方法发送异步消息: public java.util.concurrent.Future send(ProducerRecord record) public java.util.concurrent.Future send(ProducerRecord record, Callback callback) 成功可以通过处理 1) Future对象或 2)回调调用的onCompletion方法。 完整方法签名和onCompletion使用如下( 取自kafka docs ) ` ProducerRecord record = new ProducerRecord(“the-topic”, key, value); producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println(“The offset of the record we just sent is: ” + metadata.offset()); } […]