我实际上正在处理一个流,接收一堆字符串,需要计算所有字符串。总和是加总的,这意味着对于第二条记录,总和被添加到输出前一天必须是一些看起来像的 json 文件
{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
{"date" : "2018-03-03",
"sum" : 120},
{"date" :"2018-03-04",
"sum" : 203}
]
}
我创建了一个看起来像这样的流:
val eventStream : DataStream [String] =
eventStream
.addSource(source)
.keyBy("")
.TimeWindow(Time.days(1), Time.days(1))
.trigger(new MyTriggerFunc)
.aggregation(new MyAggregationFunc)
.addSink(sink)
提前感谢您的帮助:)