像下面这样的东西怎么样?
它使用 15 分钟的窗口,之后窗口状态将被清理。它还使用一个自定义触发器,每秒评估一次窗口。就窗口操作而言,有一个 ReduceFunction 仅保留每个 guid 的最新状态,还有一个 WindowFunction 发出 (state, 1) 元组。然后我们按此状态键入并求和。我认为这应该给你你正在寻找的结果。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val stream = env.addSource(new FlinkKafkaProducer(...))
val results = stream
.keyBy(_.guid)
.timeWindow(Time.minutes(15))
.trigger(ProcessingTimeTriggerWithPeriodicFirings(1000))
.apply(
(e1, e2) => e2,
(k, w, i, c: Collector[(String, Long)]) => {
if (i.head != null) c.collect((i.head.state, 1))
}
)
.keyBy(0)
.timeWindow(Time.seconds(1))
.sum(1)
.addSink(new ElasticsearchSink<>(...))
env.execute("Count States")
ProcessingTimeTriggerWithPeriodicFirings 定义如下:
object ProcessingTimeTriggerWithPeriodicFirings {
def apply(intervalMs: Long) = {
new ProcessingTimeTriggerWithPeriodicFirings(intervalMs)
}
}
class ProcessingTimeTriggerWithPeriodicFirings(intervalMs: Long)
extends Trigger[Event, TimeWindow] {
private val startTimeDesc =
new ValueStateDescriptor[Long]("start-time", classOf[Long], 0L)
override def onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
val startTime = ctx.getPartitionedState(startTimeDesc)
if (startTime.value == 0) {
startTime.update(window.getStart)
ctx.registerProcessingTimeTimer(window.getEnd)
ctx.registerProcessingTimeTimer(System.currentTimeMillis() + intervalMs)
}
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
if (time == window.getEnd) {
TriggerResult.PURGE
}
else {
ctx.registerProcessingTimeTimer(time + intervalMs)
TriggerResult.FIRE
}
}
override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
}