0

我的要求是将 30 天的数据保存到流中,以便在任何一天进行处理。所以当 FLINK 应用程序启动的第一天,它将从数据库中获取 30 天的数据并合并到当前的流数据中。我的挑战是 - 管理 30 天的数据窗口。如果我创建滑动窗口 30 天,滑动时间为 1 天。就像是

WatermarkStrategy<EventResponse> wmStrategy = WatermarkStrategy.<EventResponse>forBoundedOutOfOrderness(Duration.ofMillis(1))
                .withTimestampAssigner((eventResponse, l) -> eventResponse.getLocalDateTime().toEpochSecond(ZoneOffset.MAX));

        ds.assignTimestampsAndWatermarks(wmStrategy)
                .windowAll(SlidingEventTimeWindows.of(Time.days(30), Time.days(1)))
        .process(new ProcessAllWindowFunction<EventResponse, Object, TimeWindow>() {

            @Override
            public void process(Context context, Iterable<EventResponse> iterable, Collector<Object> collector) throws Exception {
    --- proccessing logic
}

在这种情况下,当添加历史数据的第一个元素时, process() 不会立即开始处理。我的假设是```a)默认情况下,第一个事件将是第一个窗口的一部分,并且可以立即进行处理。b) 第二天的工作将从窗口中删除最后 29 天的数据。我的假设对那段代码是否正确?感谢您对此的帮助。

4

1 回答 1

1

在这种情况下,我认为您的假设不正确。当您使用TimeWindowwithProcessFunction时,意味着该功能能够在窗口关闭时处理数据(在您的情况下是 30 天后)。在这种情况下,slide时间窗口意味着第二个窗口将包含第一个窗口的 29 天和不属于第一个窗口的第 31 天。

于 2020-07-30T11:16:48.053 回答