这是输入的 Kafka 主题,其中包含ConnectionEvent
:
ConnectionEvent("John", "123", "CONNECTED")
ConnectionEvent("John", "123", "DISCONNECTED")
ConnectionEvent("Anna", "222", "CONNECTED")
ConnectionEvent("Rohan", "334", "CONNECTED")
ConnectionEvent("Anna", "199", "CONNECTED")
ConnectionEvent("Anna", "255", "CONNECTED")
ConnectionEvent("Anna", "255", "DISCONNECTED")
ConnectionEvent("Anna", "222", "DISCONNECTED")
流和缩减逻辑
主题中的每个项目都使用消息键作为用户 ID发送。例如,“安娜”。
流必须按以下方式处理:
- John 只有 1 个会话 123 连接和断开连接。所以他下线了
- Rohan 只有 1 个未断开的会话 334。所以他上线了
- Anna 有 3 个会话(222、199、255),其中 2 个会话断开连接。所以她上线了
KTable 必须有以下数据:
John Offline
Rohan Online
Anna Online
我尝试的是这样的:
KTable<String, String> connectedSessions = stream.groupBy((k,v) -> v.getSessionId()) //Group by user and then by sessionId
.reduce((agg, newVal) -> agg) //Take latest value ie, reduce pair of records for each session to 1
.filter(x -> x.getState == CONNECTED) //Filter only session records which has CONNECTED has last state
但是现在,我将如何将复合键(用户,sessionId)取消分组为仅用户,然后根据最新状态为 CONNECTED 的 sessionId 的数量将用户标记为在线/离线?