1

你能帮我如何使用Kafka流来实现这一点吗?

场景:对订单数据的所有发票进行分组。在实时流媒体中,接收发票可能会有延迟。所以我们想在加入之前等待 20 分钟对所有发票进行分组。

示例:订单“x”有 3 张发票,预计将在 20 分钟内收到。

预期输出:订单和 3 张发票应作为输出主题中的单个数据提供。

我们有以下拓扑来实现这一点。

  1. 我们分别有订单流和发票流

  2. 我们根据订单键对发票进行分组。我们设置了 20 分钟翻滚窗口

  3. 将订单数据与生成的发票组连接起来

  4. 将输出写入新主题

问题:步骤 3 不等待步骤 2 完成。收到订单后立即加入操作。所以我们没有得到预期的输出。

我们尝试使用连接窗口来实现相同的目的。但由于连接窗口是滑动窗口,我们在输出主题中得到重复数据。

对于上面的例子,如果我们使用连接窗口而不是翻转窗口,我们将得到 3 个输出数据,订单分别有 1 个发票、2 个发票和 3 个发票。

请帮助我解决此问题或建议任何替代方法

代码片段:

 KTable<Windowed<String>, List<InvoiceList>> invoiceList= invoiceStream
                .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(1200)))
                .aggregate(() -> new ArrayList<InvoiceList>(),
                        (key, newValue, agg) -> {
                            new KeyValue<>(key, agg.add(newValue));
                            return agg;
                        },
                        Materialized.as("invoice-list").with(Serdes.String(), new ArrayListSerde<InvoiceList>(AppSerdes.InvoiceList())))
 
KStream<String, Order> orderOutput=
 
                orderStream.join(invoiceList, Joiner);
 
       
        orderOutput.to(AppConfig.OutputTopic.OUTPUT_ORDER,Produced.with(Serdes.String(), AppSerdes.Order()));
4

2 回答 2

1

我假设,订单首先出现,然后是发票,而不是其他方式。如果我的假设是正确的,那么您的逻辑将不起作用。因为当订单进入您的 KStream 时,可能没有发票,因此连接不会获取任何发票。请记住,KStream-KTable 连接是非窗口连接,可以像查找 KTable(更改日志流)一样使用。

于 2020-09-25T16:06:11.593 回答
0

在我们的案例中,这种加入力是有效的。所以我们将它作为两个单独的流接收,并在消费者上添加了自定义逻辑来处理我们的用例。

谢谢!

于 2020-11-04T00:37:37.643 回答