因此,实际上,我已经为此苦苦挣扎了几天。我正在使用来自 4 个主题的记录。我需要通过 TimedWindow 聚合记录。时间到了,我想向接收器主题发送批准的消息或未批准的消息。这可能与卡夫卡流有关吗?
似乎它会将每条记录都归入新主题,即使窗口仍然打开,这真的不是我想要的。
这是简单的代码:
builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(),
Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String,
FooTriggerMessage>("", Serdes.String(),
fooTriggerSerde))
.filter((key, value) -> value.getTriggerEventId() != null)
.groupBy((key, value) -> value.getTriggerEventId().toString(),
Serialized.with(Serdes.String(), fooTriggerSerde))
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30))
.advanceBy(TimeUnit.SECONDS.toMillis(30)))
.aggregate(() -> new BarApprovalMessage(), /* initializer */
(key, value, aggValue) -> getApproval(key, value, aggValue),/*adder*/
Materialized
.<String, BarApprovalMessage, WindowStore<Bytes, byte[]>>as(
storeName) /* state store name */
.withValueSerde(barApprovalSerde))
.toStream().to(appProperties.getBarApprovalEngineOutgoing(),
Produced.with(windowedSerde, barApprovalSerde));
截至目前,每条记录都被下沉到传出主题,我只希望它在窗口关闭时发送一条消息,可以这么说。
这可能吗?