这是一个关于如何使用状态存储的简单示例,取自Kafka Streams 上的 Confluent Platform 文档。
第 1 步:定义StateStore
/ StateStoreSupplier
:
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
- 我没有看到将 StateStore 对象添加到我的拓扑的方法。它也需要一个 StateStoreSupplier。
第 2 步:将状态存储添加到拓扑中。
选项 A - 使用处理器 API 时:
TopologyBuilder builder = new TopologyBuilder();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// Add the countStore associated with the WordCountProcessor processor
.addStateStore(countStore, "Process")
.addSink("Sink", "sink-topic", "Process");
选项 B - 使用 Kafka Streams DSL 时:
在这里,您需要调用KStreamBuilder#addStateStore("name-of-your-store")
以将状态存储添加到您的处理器拓扑。然后,当调用诸如KStream#process()
or之类的方法时KStream#transform()
,您还必须传入状态存储的名称——否则您的应用程序将在运行时失败。
在以下示例中KStream#transform()
:
KStreamBuilder builder = new KStreamBuilder();
// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);
KStream<byte[], String> input = builder.stream("source-topic");
KStream<String, Long> transformed =
input.transform(/* your TransformerSupplier */, countStore.name());
为什么处理器需要有状态存储?对于无状态且不维护状态的处理器,这似乎很可能是可选的。
你是对的——如果你的处理器不保持状态,你就不需要状态存储。
使用 DSL 时,您只需调用KStreamBuilder#addStateStore("name-of-your-store")
将状态存储添加到您的处理器拓扑并稍后引用它。