0

我们将 KTable 实现为 Internal-State-Store。

a.) 我如何以及在哪里可以指定,这个 Internal-State-Store 应该是 Persistent 并自动备份到另一个 kafka 主题?

b.) 我们如何指定这个 Internal-State-Store 应该是全局的,这样我的任何流任务都应该能够引用它?

c.) 是否存在将传入的 messageRecords 写入 Internal-State-Store 的频率?会不会发生这样的情况,一个特定的 MessageRecord 被流处理器处理,存储在 KTable 中,然后我的流处理器死了,它无法进入 Internal-State-Store !

下面是我们现在使用的片段:-

KTable<String, String> KT0 = streamsBuilder.table(AppConfigs.topicName, Materialized.as(AppConfigs.stateStoreName)));

任何回应都将受到高度赞赏!

4

1 回答 1

0

a) 如果你有一个状态存储的自定义实现,你可以通过Materialized.as(KeyValueStoreSupplier).

b) 对于全局商店用例,您可以使用builder.globalKTable().

c) 写入发生在每条记录的基础上,但可以缓存在内存中。在提交输入主题偏移量之前,状态存储将被刷新,因此您永远不会错过任何数据。默认情况下,KafkaStreams 提供至少一次处理语义。

于 2021-02-28T19:22:02.387 回答