3

我实际上正在处理一个流,接收一堆字符串,需要计算所有字符串。总和是加总的,这意味着对于第二条记录,总和被添加到输出前一天必须是一些看起来像的 json 文件

{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
    {"date" : "2018-03-03",
    "sum" : 120},
  {"date" :"2018-03-04",
  "sum" : 203}
  ]
}

我创建了一个看起来像这样的流:

val eventStream : DataStream [String] = 
eventStream
    .addSource(source)
    .keyBy("")
    .TimeWindow(Time.days(1), Time.days(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)
    .addSink(sink)

提前感谢您的帮助:)

4

1 回答 1

1

在 Flink 中使用 JSON 的注意事项:

用于JSONDeserializationSchema反序列化事件,这将产生ObjectNodes. 为方便起见,您可以将 映射ObjectNodeYourObject或继续使用ObjectNode.

使用教程ObjectNodehttp ://www.baeldung.com/jackson-json-node-tree-model

回到你的情况,你可以这样做:

val eventStream : DataStream [ObjectNode] = 
oneMinuteAgg
    .addSource(source)
    .windowAll()
    .TimeWindow(Time.minutes(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)

将输出 1 分钟聚合的流

[     
      {
      "date" :2018-03-03
      "sum" : 120
      }, 
      {
      "date" :2018-03-03
      "sum" : 120
      }
]

然后将另一个运算符链接到“oneMinuteAgg”,它将 1min 聚合添加到 1day 聚合中:

[...]
oneMinuteAgg
        .windowAll()
        .TimeWindow(Time.days(1))
        .trigger(new Whatever)
        .aggregation(new YourDayAggF)

这将输出你需要的东西

{
    "aggregationType" : "day"
    "days before" : 4
    "aggregates : [{
      "date" :2018-03-03
      "sum" : 120
      }, 
      {
      "date" :2018-03-03
      "sum" : 120
      }]
}

我曾经windowAll()假设您不需要键入流。

于 2018-03-20T10:28:31.237 回答