5

我正在构建拓扑并希望使用KStream.process()将一些中间值写入数据库。此步骤不会改变数据的性质,并且是完全无状态的。

添加处理器需要创建一个ProcessorSupplier并将此实例KStream.process()与状态存储的名称一起传递给函数。这是我不明白的。

由于需要StateStoreSupplier ,如何将StateStore对象添加到拓扑?

在应用程序启动时未能添加 say 会导致StateStore此错误:

线程“主”org.apache.kafka.streams.errors.TopologyBuilderException 中的异常:无效的拓扑构建:StateStore my-state-store 尚未添加。

为什么处理器需要有状态存储?对于无状态且不维护状态的处理器,这似乎很可能是可选的。

通过应用处理器处理此流中的所有元素,一次一个元素。

4

1 回答 1

12

这是一个关于如何使用状态存储的简单示例,取自Kafka Streams 上的 Confluent Platform 文档

第 1 步:定义StateStore/ StateStoreSupplier

StateStoreSupplier countStore = Stores.create("Counts")
                                      .withKeys(Serdes.String())
                                      .withValues(Serdes.Long())
                                      .persistent()
                                      .build();
  1. 我没有看到将 StateStore 对象添加到我的拓扑的方法。它也需要一个 StateStoreSupplier。

第 2 步:将状态存储添加到拓扑中。

选项 A - 使用处理器 API 时:

TopologyBuilder builder = new TopologyBuilder();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
       .addProcessor("Process", () -> new WordCountProcessor(), "Source")
       // Add the countStore associated with the WordCountProcessor processor
       .addStateStore(countStore, "Process")
       .addSink("Sink", "sink-topic", "Process");

选项 B - 使用 Kafka Streams DSL 时:

在这里,您需要调用KStreamBuilder#addStateStore("name-of-your-store")以将状态存储添加到您的处理器拓扑。然后,当调用诸如KStream#process()or之类的方法时KStream#transform(),您还必须传入状态存储的名称——否则您的应用程序将在运行时失败。

在以下示例中KStream#transform()

KStreamBuilder builder = new KStreamBuilder();

// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);

KStream<byte[], String> input = builder.stream("source-topic");

KStream<String, Long> transformed =
    input.transform(/* your TransformerSupplier */, countStore.name());

为什么处理器需要有状态存储?对于无状态且不维护状态的处理器,这似乎很可能是可选的。

你是对的——如果你的处理器不保持状态,你就不需要状态存储。

使用 DSL 时,您只需调用KStreamBuilder#addStateStore("name-of-your-store")将状态存储添加到您的处理器拓扑并稍后引用它。

于 2016-08-22T18:55:07.543 回答