0

这是输入的 Kafka 主题,其中包含ConnectionEvent

ConnectionEvent("John", "123", "CONNECTED")
ConnectionEvent("John", "123", "DISCONNECTED")
ConnectionEvent("Anna", "222", "CONNECTED")
ConnectionEvent("Rohan", "334", "CONNECTED")
ConnectionEvent("Anna", "199", "CONNECTED")
ConnectionEvent("Anna", "255", "CONNECTED")
ConnectionEvent("Anna", "255", "DISCONNECTED")
ConnectionEvent("Anna", "222", "DISCONNECTED")

流和缩减逻辑

主题中的每个项目都使用消息键作为用户 ID发送。例如,“安娜”。
流必须按以下方式处理:

  • John 只有 1 个会话 123 连接和断开连接。所以他下线了
  • Rohan 只有 1 个未断开的会话 334。所以他上线了
  • Anna 有 3 个会话(222、199、255),其中 2 个会话断开连接。所以她上线了

KTable 必须有以下数据:

John Offline
Rohan Online
Anna Online

我尝试的是这样的:

KTable<String, String> connectedSessions = stream.groupBy((k,v) -> v.getSessionId()) //Group by user and then by sessionId
            .reduce((agg, newVal) -> agg)  //Take latest value ie, reduce pair of records for each session to 1
            .filter(x -> x.getState == CONNECTED)  //Filter only session records which has CONNECTED has last state

但是现在,我将如何将复合键(用户,sessionId)取消分组为仅用户,然后根据最新状态为 CONNECTED 的 sessionId 的数量将用户标记为在线/离线?

4

1 回答 1

0

AFAIU 一个用户在线,只要他的 CONNECTED 事件的数量大于 DISCONNECTED。因此,您可以汇总流中的连接数并检查它是否为正。就像是:

        KTable<String, String> connectedSessions = stream.groupByKey()
        .aggregate(
            () -> 0,
            (k, v, numberOfConnections) -> v.getState == CONNECTED ? numberOfConnections++ : numberOfConnections--)
        .mapValues((k, numberOfConnections) -> numberOfConnections > 0 ? "Online" : "Offline");
于 2020-06-18T16:59:12.903 回答