0

我有一个关于资源的事件流,如下所示:

id, type,      count
1,  view,      1
1,  download,  3
2,  view,      1
3,  view,      1
1,  download,  2
3,  view,      1

我正在尝试为每个资源生成统计信息(总计),所以如果我得到一个像上面这样的流,结果应该是:

id, views, downloads
1,  1,     5
2,  1,     0
3,  2,     0

现在我写了一个 ProcessFunction 来计算总数,如下所示:

public class CountTotals extends ProcessFunction<Event, ResourceTotals> {
    private ValueState<ResourceTotals> totalsState;

    @Override
    public void open(Configuration config) throws Exception {
        ValueStateDescriptor<ResourceTotals> totalsDescriptor = new ValueStateDescriptor<>("totals state", ResourceTotals.class);
        totalsDescriptor.setQueryable("resource-totals");
        totalsState = getRuntimeContext().getState(totalsDescriptor);
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<ResourceTotals> out) throws Exception {
        ResourceTotals totals = totalsState.value();
        if (totals == null) {
            totals = new ResourceTotals();
            totals.id = event.id;
        }
        switch (event.type) {
            case "view":
                totals.views += event.count;
                break;
            case "download":
                totals.downloads += event.count;
        }
        totalsState.update(totals);
        out.collect(totals);
    }
}

从代码中可以明显看出,它将为每个事件发出一个新的 ResourceTotals,但我想每分钟发出一次每个资源的总数,而不是更频繁。

我尝试使用全局窗口和触发器(ContinuousProcessingTimeTrigger)进行试验,但无法使其工作。我遇到的问题是:

  1. 如何表达我想要窗口的最后一个事件?
  2. 如何不最终存储在该全局窗口中生成的所有 ResourceTotals?

任何帮助,将不胜感激。

4

1 回答 1

2

您可以使用计时器每分钟发出一次 totalsState 中的值。由于我在您的数据流中看不到任何时间戳,我想您会使用处理时间计时器。

另一种方法是将 ProcessFunction 替换为 TimeWindow 以及保留最后一个事件的ReduceFunction

在任何一种情况下,您都可以考虑通过 ID 和类型字段来键入流,这应该会稍微简化您的状态管理。

更新:

是的,定时器是 Flink 检查点和恢复状态的一部分。

于 2018-04-03T19:00:26.000 回答