你能帮我如何使用Kafka流来实现这一点吗?
场景:对订单数据的所有发票进行分组。在实时流媒体中,接收发票可能会有延迟。所以我们想在加入之前等待 20 分钟对所有发票进行分组。
示例:订单“x”有 3 张发票,预计将在 20 分钟内收到。
预期输出:订单和 3 张发票应作为输出主题中的单个数据提供。
我们有以下拓扑来实现这一点。
我们分别有订单流和发票流
我们根据订单键对发票进行分组。我们设置了 20 分钟翻滚窗口
将订单数据与生成的发票组连接起来
将输出写入新主题
问题:步骤 3 不等待步骤 2 完成。收到订单后立即加入操作。所以我们没有得到预期的输出。
我们尝试使用连接窗口来实现相同的目的。但由于连接窗口是滑动窗口,我们在输出主题中得到重复数据。
对于上面的例子,如果我们使用连接窗口而不是翻转窗口,我们将得到 3 个输出数据,订单分别有 1 个发票、2 个发票和 3 个发票。
请帮助我解决此问题或建议任何替代方法
代码片段:
KTable<Windowed<String>, List<InvoiceList>> invoiceList= invoiceStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(1200)))
.aggregate(() -> new ArrayList<InvoiceList>(),
(key, newValue, agg) -> {
new KeyValue<>(key, agg.add(newValue));
return agg;
},
Materialized.as("invoice-list").with(Serdes.String(), new ArrayListSerde<InvoiceList>(AppSerdes.InvoiceList())))
KStream<String, Order> orderOutput=
orderStream.join(invoiceList, Joiner);
orderOutput.to(AppConfig.OutputTopic.OUTPUT_ORDER,Produced.with(Serdes.String(), AppSerdes.Order()));