我想动态保留最后 1 小时事件的缓冲区。该缓冲区应该给我一个重播功能,以便可以对最后一小时的数据执行查询。Flink 中是否已经实现了一些东西?还是我需要自己构建它?
我尝试使用 Window API,但似乎 Flink 没有给我一个向前移动的固定宽度时间窗口。
我想动态保留最后 1 小时事件的缓冲区。该缓冲区应该给我一个重播功能,以便可以对最后一小时的数据执行查询。Flink 中是否已经实现了一些东西?还是我需要自己构建它?
我尝试使用 Window API,但似乎 Flink 没有给我一个向前移动的固定宽度时间窗口。
我得到了我自己问题的解决方案,但我想保留这个问题,以防你有更好的解决方案。因为我的绝对违反了函数式编程的一些良好实践。
我的技巧如下。
val keyedEventStream: KeyedStream[E]
// create a stream of [hourly window as a set of events]
val eventWindowStream = keyedEventStream.timeWindow(Time.minutes(60), Time.milliseconds(50)).fold(scala.collection.Set[E]())((set: scala.collection.Set[E], event: E) => set + event)
// This is the hourly buffer my process logic will use
var workWindow = scala.collection.Set[E]()
// update the workspace window with the stream of hourly window.
eventWindowStream.map((set: scala.collection.Set[W]) => workWindow = set)
可以看到,last map的唯一目的就是更新变量workWindow,这其实是内联函数的一个副作用……