我正在使用 Flink Stream 处理 3G 网络(GPRS 隧道协议)中的数据流量日志。而且我在用户的用户会话中合成信息时遇到了麻烦。
例如:如何映射开始和结束一个会话。我不知道 Flink 流媒体是否适合处理这样的复杂协议?
p/s:
我们捕获 3G 网络中 SGSN 和 GGSN 之间交换的数据(使用带有 GTP-C/U 消息的 GTP 协议)。当 SGSN 发送CreateReq (TEID, Seq, IMSI, TEID_dl,TEID_data_dl)消息和 GGSN 响应CreateRsp(TEID_dl, Seq, TEID_ul, TEID_data_ul)消息时,会话开始。会话建立后,SGSN向GGSN发送的其他GTP-C消息(例如:UpdateReq、DeleteReq)使用TEID_ul,响应消息使用TEID_dl,GTP-U消息使用TEID_data_ul(SGSN -> GGSN)和TEID_data_dl(GGSN -> SGSN) )。GTP-U 消息包含 AppID (facebook, twitter, web), url,...
最后,我想处理连续的日志数据流并映射同一用户的 GTP-C 消息和 GTP-U (IMSI ) 作报告。
我试过这个:
val sessions = createReqs.connect(createRsps).flatMap(new CoFlatMapFunction[CreateReq, CreateRsp, Session] {
// holds CreateReqs indexed by (tedid_dl,seq)
private val createReqs = mutable.HashMap.empty[(String, String), CreateReq]
// holds CreateRsps indexed by (tedid,seq)
private val createRsps = mutable.HashMap.empty[(String, String), CreateRsp]
override def flatMap1(req: CreateReq, out: Collector[Session]): Unit = {
val key = (req.teid_dl, req.header.seqNum)
val oRsp = createRsps.get(key)
if (!oRsp.isEmpty) {
val rsp = oRsp.get
println("OK")
out.collect(new Session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn))
createRsps.remove(key)
} else {
createReqs.put(key, req)
}
}
override def flatMap2(rsp: CreateRsp, out: Collector[Session]): Unit = {
val key = (rsp.header.teid, rsp.header.seqNum)
val oReq = createReqs.get(key)
if (!oReq.isEmpty) {
val req = oReq.get
out.collect(new Session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn))
createReqs.remove(key)
} else {
createRsps.put(key, rsp)
}
}
}).print()
此代码始终返回空结果。输入流包含同一会话的 CreateRsp 和 CreateReq 消息的事实。它们看起来非常接近(在 1 秒内)。当我调试时,oReq.isEmpty == true每次。我做错了什么?