0

谁能帮忙。我有以下要求。

要求:处理非重复,订购聊天消息并根据 ProgramUserId 将它们捆绑在一起,这是过程和涉及的主题。

数据设置:ProgramUserId 可以有任意数量的消息,但每条消息都是唯一的,并且有一个复合键:MsgId + Action。所以想象一下kafka中的数据如下所示。

P2->M3+A1 , P2->M2+A1 , P2->M1+A1 , P1->M3+A1 , P1->M2+A2 , P1->M2+A1 , P1->M1+A1

我现在正在这样做:

Initial-Topic:(原始密钥:ProgramUserId)

1)从 Initial-Topic --> 使用 Kstream(重新键入:Msg Id + Action)--> 然后写入主题:dedup-Topic

  1. dedup-Topic --> 使用 Kstream(重新键入原始密钥:ProgramUserId)--> 写入主题: Final-Topic

由于我们在 dedup-topic 上重新键入,因此消息的顺序会混乱,因为重新键入会导致重新分区,因此订单中没有保证。

我添加了以下逻辑来实现重复数据删除:从dedup-topic创建 Ktable 和 Postgres 表(使用 Sink 连接)。对于每条传入的消息,检查 Ktable 和 PG 表中的键(Msg Id + Action)。如果未找到记录,则表示它没有重复并将该记录写入 dedup-topic。

但是由于在dedup-Topic 中重新键入/重新分区,上述消息顺序混乱。

请帮助此时如何实现有序的消息?

4

0 回答 0