0

我正在使用 Flink Stream 处理 3G 网络(GPRS 隧道协议)中的数据流量日志。而且我在用户的用户会话中合成信息时遇到了麻烦。

例如:如何映射开始和结束一个会话。我不知道 Flink 流媒体是否适合处理这样的复杂协议?

p/s:
我们捕获 3G 网络中 SGSN 和 GGSN 之间交换的数据(使用带有 GTP-C/U 消息的 GTP 协议)。当 SGSN 发送CreateReq (TEID, Seq, IMSI, TEID_dl,TEID_data_dl)消息和 GGSN 响应CreateRsp(TEID_dl, Seq, TEID_ul, TEID_data_ul)消息时,会话开始。会话建立后,SGSN向GGSN发送的其他GTP-C消息(例如:UpdateReq、DeleteReq)使用TEID_ul,响应消息使用TEID_dl,GTP-U消息使用TEID_data_ul(SGSN -> GGSN)和TEID_data_dl(GGSN -> SGSN) )。GTP-U 消息包含 AppID (facebook, twitter, web), url,...
最后,我想处理连续的日志数据流并映射同一用户的 GTP-C 消息和 GTP-U (IMSI ) 作报告。

我试过这个:

val sessions = createReqs.connect(createRsps).flatMap(new CoFlatMapFunction[CreateReq, CreateRsp, Session] {
  // holds CreateReqs indexed by (tedid_dl,seq)
  private val createReqs = mutable.HashMap.empty[(String, String), CreateReq]
  // holds CreateRsps indexed by (tedid,seq)
  private val createRsps = mutable.HashMap.empty[(String, String), CreateRsp]


  override def flatMap1(req: CreateReq, out: Collector[Session]): Unit = {
    val key = (req.teid_dl, req.header.seqNum)
    val oRsp = createRsps.get(key)
    if (!oRsp.isEmpty) {
      val rsp = oRsp.get
      println("OK")
      out.collect(new Session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn))
      createRsps.remove(key)
    } else {
      createReqs.put(key, req)
    }
  }

  override def flatMap2(rsp: CreateRsp, out: Collector[Session]): Unit = {
    val key = (rsp.header.teid, rsp.header.seqNum)
    val oReq = createReqs.get(key)

    if (!oReq.isEmpty) {
      val req = oReq.get
      out.collect(new Session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn))
      createReqs.remove(key)
    } else {
      createRsps.put(key, rsp)
    }
  }
}).print()


此代码始终返回空结果。输入流包含同一会话的 CreateRsp 和 CreateReq 消息的事实。它们看起来非常接近(在 1 秒内)。当我调试时,oReq.isEmpty == true每次。我做错了什么?

4

1 回答 1

0

老实说,这里的电信细节有点难以看清,但如果我理解正确的话,你至少有 3 个流,前两个是CreateReqCreateRsp流。

为了检测会话的建立,我将使用ConnectedDataStream抽象在上述两个流之间共享状态。查看此示例以了解用法或相关的Flink 文档

这是你想要达到的目标吗?

于 2015-10-11T12:08:26.167 回答