我正在使用 Flink 1.2-Snapshot。我的数据如下所示:
- id=25398102,sourceId=1,ts=2016-10-15 00:00:56,user=14,value=919
- id=25398185,sourceId=1,ts=2016-10-15 00:01:06,user=14,value=920
- id=25398210,sourceId=1,ts=2016-10-15 00:01:16,user=14,value=944
- id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
- id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
- id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
- id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
- id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
- id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000
我正在运行以下代码来创建基于 Windows 的用户 ID:
stream.flatMap(new LogsParser())
.assignTimestampsAndWatermarks(new MessageTimestampExtractor())
.keyBy("sourceId")
.window(GlobalWindows.create())
.trigger(PurgingTrigger.of(new MySessionTrigger()))
.apply(new SessionWindowFunction())
.print();
MySession 触发器查看接收到的事件并检查用户 ID 以触发用户 ID 更改窗口。SessionWindowFunction 只是在窗口外创建一个会话。
以下是创建的会话:
会议:
- id=25398102,sourceId=1,ts=2016-10-15 00:00:56,user=14,value=919
- id=25398185,sourceId=1,ts=2016-10-15 00:01:06,user=14,value=920
- id=25398210,sourceId=1,ts=2016-10-15 00:01:16,user=14,value=944
- id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
会议:
- id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
- id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
- id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
- id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
会议:
- id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000
如您所见,问题是在每个会话中,最后一个事件实际上属于下一个窗口。由于最后一个事件已经在窗口中,因此触发窗口的决定有点晚了。
如何在不考虑该窗口中的最后一个事件的情况下触发该窗口?