0

我有一个主题,其中包含每个会话的用户连接和断开连接事件。我想使用 Kafka 流来处理这个主题并根据某些条件更新 KTable。每条记录都不能更新 KTable。所以我需要处理多条记录才能知道是否需要更新 KTable。

例如,按用户然后按 sessionid 处理流和聚合。如果该用户的至少一个 sessionid 只有 Connected 事件,则 KTable 必须作为用户在线更新(如果尚未更新)。
如果用户的所有 sessionId 都有 Disconnected 事件,KTable 必须更新为用户离线,如果还没有。

我怎样才能实现这样的逻辑?
我们是否可以在所有应用程序实例中实现这个 KTable,以便每个实例在本地都有这些数据?

4

1 回答 1

0

听起来像是一个相当复杂的场景。

也许,最好在这种情况下使用处理器 API?AKTable基本上只是一个 KV 存储,并且使用处理器 API,允许您应用复杂的处理来决定是否要更新状态存储。AKTable本身不允许您应用复杂的逻辑,但它会应用它收到的每个更新。

因此,使用 DSL,您需要进行一些处理,并且如果您想更新一条KTable更新记录,请仅针对这种情况发送一条更新记录。像这样的东西:

KStream stream = builder.stream("input-topic");
// apply your processing and write an update record into `updates` when necessary
KStream updates = stream...
KTable table = updates.toTable();
于 2021-01-02T22:54:15.337 回答