我们将 KTable 实现为 Internal-State-Store。
a.) 我如何以及在哪里可以指定,这个 Internal-State-Store 应该是 Persistent 并自动备份到另一个 kafka 主题?
b.) 我们如何指定这个 Internal-State-Store 应该是全局的,这样我的任何流任务都应该能够引用它?
c.) 是否存在将传入的 messageRecords 写入 Internal-State-Store 的频率?会不会发生这样的情况,一个特定的 MessageRecord 被流处理器处理,存储在 KTable 中,然后我的流处理器死了,它无法进入 Internal-State-Store !
下面是我们现在使用的片段:-
KTable<String, String> KT0 = streamsBuilder.table(AppConfigs.topicName, Materialized.as(AppConfigs.stateStoreName)));
任何回应都将受到高度赞赏!