0

我正在使用 Flink 1.2-Snapshot。我的数据如下所示:

  • id=25398102,sourceId=1,ts=2016-10-15 00:00:56,user=14,value=919
  • id=25398185,sourceId=1,ts=2016-10-15 00:01:06,user=14,value=920
  • id=25398210,sourceId=1,ts=2016-10-15 00:01:16,user=14,value=944
  • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
  • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
  • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
  • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

我正在运行以下代码来创建基于 Windows 的用户 ID:

    stream.flatMap(new LogsParser())
            .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
            .keyBy("sourceId")
            .window(GlobalWindows.create())
            .trigger(PurgingTrigger.of(new MySessionTrigger()))
            .apply(new SessionWindowFunction())
            .print();

MySession 触发器查看接收到的事件并检查用户 ID 以触发用户 ID 更改窗口。SessionWindowFunction 只是在窗口外创建一个会话。

以下是创建的会话:

  1. 会议:

    • id=25398102,sourceId=1,ts=2016-10-15 00:00:56,user=14,value=919
    • id=25398185,sourceId=1,ts=2016-10-15 00:01:06,user=14,value=920
    • id=25398210,sourceId=1,ts=2016-10-15 00:01:16,user=14,value=944
    • id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
  2. 会议:

    • id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
    • id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
    • id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
    • id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
  3. 会议:

    • id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

如您所见,问题是在每个会话中,最后一个事件实际上属于下一个窗口。由于最后一个事件已经在窗口中,因此触发窗口的决定有点晚了。

如何在不考虑该窗口中的最后一个事件的情况下触发该窗口?

4

1 回答 1

0

一种想法是在用户 ID 更改时使用平面图将标记插入流中。然后,您的触发器函数可以在看到这些标记之一时触发,并且您的会话窗口函数可以过滤掉这些标记。

于 2016-11-11T15:39:36.237 回答