如何注册无状态处理器(似乎也需要StateStore)?

我正在构建拓扑,并希望使用KStream.process()将一些中间值写入数据库。 此步骤不会更改数据的性质,并且完全是无状态的。

添加处理器需要创建一个ProcessorSupplier并将此实例与状态存储的名称一起传递给KStream.process()函数。 这是我不明白的。

如何将StateStore对象添加到拓扑中,因为它需要StateStoreSupplier ?

无法添加所述StateStore会在应用程序启动时出现此错误:

线程“main”中的exceptionorg.apache.kafka.streams.errors.TopologyBuilderException:无效的拓扑构建:尚未添加StateStore my-state-store。

为什么处理器需要有状态存储? 对于无状态且不维持状态的处理器来说,这似乎是可选的。

通过应用处理器来处理此流中的所有元素,一次一个元素。

这是一个关于如何使用状态存储的简单示例 ,取自Kafka Streams上的Confluent Platform文档 。

第1步:定义StateStore / StateStoreSupplier

 StateStoreSupplier countStore = Stores.create("Counts") .withKeys(Serdes.String()) .withValues(Serdes.Long()) .persistent() .build(); 
  1. 我没有看到将StateStore对象添加到拓扑的方法。 它也需要StateStoreSupplier。

步骤2:将状态存储添加到拓扑中。

选项A – 使用Processor API时:

 TopologyBuilder builder = new TopologyBuilder(); // add the source processor node that takes Kafka topic "source-topic" as input builder.addSource("Source", "source-topic") .addProcessor("Process", () -> new WordCountProcessor(), "Source") // Add the countStore associated with the WordCountProcessor processor .addStateStore(countStore, "Process") .addSink("Sink", "sink-topic", "Process"); 

选项B – 使用Kafka Streams DSL时:

在这里,您需要调用KStreamBuilder#addStateStore("name-of-your-store")将状态存储添加到处理器拓扑中。 然后,在调用KStream#process()KStream#transform() ,还必须传入状态存储的名称 – 否则您的应用程序将在运行时失败。

KStream#transform()为例:

 KStreamBuilder builder = new KStreamBuilder(); // Add the countStore that will be used within the Transformer[Supplier] // that we pass into `transform()` below. builder.addStateStore(countStore); KStream input = builder.stream("source-topic"); KStream transformed = input.transform(/* your TransformerSupplier */, countStore.name()); 

为什么处理器需要有状态存储? 对于无状态且不维持状态的处理器来说,这似乎是可选的。

你是对的 – 如果你的处理器没有维持状态,你就不需要一个状态存储。

使用DSL时,您只需调用KStreamBuilder#addStateStore("name-of-your-store")将状态存储添加到处理器拓扑中,稍后再引用它。