我有一个主题,其中包含每个会话的用户连接和断开连接事件。我想使用 Kafka 流来处理这个主题并根据某些条件更新 KTable。每条记录都不能更新 KTable。所以我需要处理多条记录才能知道是否需要更新 KTable。
例如,按用户然后按 sessionid 处理流和聚合。如果该用户的至少一个 sessionid 只有 Connected 事件,则 KTable 必须作为用户在线更新(如果尚未更新)。
如果用户的所有 sessionId 都有 Disconnected 事件,KTable 必须更新为用户离线,如果还没有。
我怎样才能实现这样的逻辑?
我们是否可以在所有应用程序实例中实现这个 KTable,以便每个实例在本地都有这些数据?