Tag: apache kafka streams

Kafka Streams表转换

我在SQL Server中有一个表,我想流式传输给Kafka主题,结构如下: (UserID, ReportID) 该表将不断更改(记录添加,插入,无更新) 我想将其转换为这种结构并放入Elasticsearch: { “UserID”: 1, “Reports”: [1, 2, 3, 4, 5, 6] } 我到目前为止看到的示例是日志或点击流,但在我的情况下不起作用。 这种用例是否可行? 我总是可以看看UserID变化和查询数据库,但这看起来很幼稚而不是最好的方法。 更新 import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.*; import java.util.ArrayList; import java.util.Properties; public class MyDemo { public static void main(String… args) { System.out.println(“Hello KTable!”); final Serde longSerde = […]

kafka KStream – 采用n秒计数的拓扑

我有一个JSON对象流,我键入一些值的哈希值。 我希望在n秒(10?60?)间隔内按键计数,并使用这些值进行一些模式分析。 我的拓扑: K->aggregateByKey(n seconds)->process() 在这个process – init()步骤Ive调用了ProcessorContent.schedule(60 * 1000L) ,希望调用.punctuate() 。 从这里开始,我将遍历内部哈希中的值并相应地执行操作。 我看到值通过聚合步骤并命中process()函数,但永远不会调用.punctuate() 。 码: KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream opxLines = kStreamBuilder.stream(TOPIC); KStream mapped = opxLines.map(new ReMapper()); KTable<Windowed, String> ktRtDetail = mapped.aggregateByKey( new AggregateInit(), new OpxAggregate(), TimeWindows.of(“opx_aggregate”, 60000)); ktRtDetail.toStream().process(new ProcessorSupplier<Windowed, String>() { @Override public Processor<Windowed, String> get() { return new AggProcessor(); } […]

在Kafka Streams中反序列化POJO

我的Kafka主题包含此格式的消息 user1,subject1,80|user1,subject2,90 user2,subject1,70|user2,subject2,100 and so on. 我创建了用户POJO如下。 class User implements Serializable{ /** * */ private static final long serialVersionUID = -253687203767610477L; private String userId; private String subject; private String marks; public User(String userId, String subject, String marks) { super(); this.userId = userId; this.subject = subject; this.marks = marks; } public String getUserId() { return userId; […]

Kafka Streams:错误退出的正确方法

我已经成功地获得了一个使用,转换和生成数据的流应用程序,但我注意到,流处理器会定期转换到ERROR状态,并且该进程将在不退出的情况下坐在那里。 显示我的日志: All stream threads have died. The instance will be in error state and should be closed. 有没有办法告诉Streams应用程序一旦达到ERROR状态就退出? 也许是各种监视器线程? 我看到Kafka Streams代码的注释中的引用给需要在应用程序达到此状态时关闭应用程序的用户,但是,我无法在文档中找到提及此任务的内容。 有一个简单的方法来执行此关闭步骤吗? 可能是错误的方式可能关闭错误 我的目的是在KafkaStreams对象上设置UncaughtExceptionHandler方法,以执行以下操作: 记录错误 使用原始KafkaStreams对象上的close方法关闭流 结果是: 记录exception消息 INFO org.apache.kafka.streams.KafkaStreams … State transition from ERROR to PENDING_SHUTDOWN INFO org.apache.kafka.streams.processor.internals.StreamThread … Informed to shut down 然后,不幸的是,这个过程似乎没有退出。 FWIW我觉得这可能是对setUncaughtExceptionHandler的误用

为什么我看不到Kafka Streams reduce方法的任何输出?

给出以下代码: KStream stream = builder.stream(Serdes.String(), customSerde, “test_in”); stream .groupByKey(Serdes.String(), customSerde) .reduce(new CustomReducer(), “reduction_state”) .print(Serdes.String(), customSerde); 我在Reducer的apply方法中有一个println语句,当我希望减少时会成功打印出来。 但是,上面显示的最终打印语句不显示任何内容。 同样,如果我使用方法而不是print ,我在目标主题中看不到任何消息。 在reduce语句之后我需要什么来查看减少的结果? 如果一个值被推送到输入,我不希望看到任何东西。 如果按下具有相同键的第二个值,我希望减少器应用(它确实如此),并且我还期望减少的结果继续到处理管道中的下一步。 如上所述,我在管道的后续步骤中没有看到任何内容,我不明白为什么。

如何注册无状态处理器(似乎也需要StateStore)?

我正在构建拓扑,并希望使用KStream.process()将一些中间值写入数据库。 此步骤不会更改数据的性质,并且完全是无状态的。 添加处理器需要创建一个ProcessorSupplier并将此实例与状态存储的名称一起传递给KStream.process()函数。 这是我不明白的。 如何将StateStore对象添加到拓扑中,因为它需要StateStoreSupplier ? 无法添加所述StateStore会在应用程序启动时出现此错误: 线程“main”中的exceptionorg.apache.kafka.streams.errors.TopologyBuilderException:无效的拓扑构建:尚未添加StateStore my-state-store。 为什么处理器需要有状态存储? 对于无状态且不维持状态的处理器来说,这似乎是可选的。 通过应用处理器来处理此流中的所有元素,一次一个元素。

Kafka KStream – 使用带窗口的AbstractProcessor

我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储。 我期待看到.punctuate()大约每30秒调用一次。 我得到的是保存在这里 。 (原始文件长达数千行) 总结 – .punctuate()看似随机然后重复调用。 它似乎不符合通过ProcessorContext.schedule()设置的值。 编辑: 另一次运行相同的代码大约每四分钟调用一次.punctuate() 。 这次我没有看到疯狂的重复值。 来源没有变化 – 只是结果不同。 使用以下代码: 主要 StreamsConfig streamsConfig = new StreamsConfig(config); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream lines = kStreamBuilder.stream(TOPIC); lines.process(new BPS2()); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig); kafkaStreams.start(); 处理器 public class BP2 extends AbstractProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class); […]

Libat上的UnsatisfiedLinkError在使用Kafka Streams进行开发时会破坏DB dll

我正在开发Windows机器上编写Kafka Streams应用程序。 如果我尝试使用Kafka Streams的leftJoin和branchfunction,我在执行jar应用程序时会收到以下错误: Exception in thread “StreamThread-1” java.lang.UnsatisfiedLinkError: C:\Users\user\AppData\Local\Temp\librocksdbjni325337723194862275.dll: Can’t find dependent libraries at java.lang.ClassLoader$NativeLibrary.load(Native Method) at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) at java.lang.Runtime.load0(Runtime.java:809) at java.lang.System.load(System.java:1086) at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78) at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56) at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64) at org.rocksdb.RocksDB.(RocksDB.java:35) at org.rocksdb.Options.(Options.java:22) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115) at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:38) at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:75) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:72) at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54) at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101) at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) […]

Kafka KStreams – 处理超时

我试图使用带有TimeWindows.of(“name”, 30000) .process()批量处理一些KTable值并发送它们。 似乎30秒超过了消费者超时间隔,之后Kafka认为该消费者已经解散并释放分区。 我已经尝试提高轮询频率和提交间隔以避免这种情况: config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, “5000”); config.put(StreamsConfig.POLL_MS_CONFIG, “5000”); 不幸的是,这些错误仍在发生: (很多这些) ERROR oakspinternals.RecordCollector – Error sending record to topic kafka_test1-write_aggregate2-changelog org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0 其次是: INFO oakcciAbstractCoordinator – Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1 WARN oakspinternals.StreamThread – […]

将Kafka输入流动态连接到多个输出流

Kafka Streams内置了哪些function,允许将单个输入流动态连接到多个输出流? KStream.branch允许基于true / false谓词进行分支,但这不是我想要的。 我希望每个传入的日志确定它将在运行时流式传输的主题,例如,日志{“date”: “2017-01-01”}将流式传输到主题topic-2017-01-01和日志{“date”: “2017-01-02”}将流式传输到主题topic-2017-01-02 。 我可以在流上调用forEach ,然后写给Kafka制作人,但这看起来并不优雅。 在Streams框架中有更好的方法吗?