1

最近我正在尝试使用 Apache Flink 进行快速批处理。我有一个带有 column:value 和不相关索引列的表

基本上我想计算每 5 行值的平均值和范围。然后我将根据我刚刚计算的平均值计算平均值和标准差。所以我想最好的方法是使用Tumblewindow.

看起来像这样

DataSet<Tuple2<Double, Integer>> rawData = {get the source data};
Table table = tableEnvironment.fromDataSet(rawData);
Table groupedTable = table
            .window(Tumble.over("5.rows").on({what should I write?}).as("w")
            .groupBy("w")
            .select("f0.avg, f0.max-f0.min");

{The next step is to use groupedTable to calculate overall mean and stdDev} 

但我不知道该写什么.on()。我试过"proctime"了,但它说没有这样的输入。我只希望它按从源读取的顺序分组。但它必须是时间属性,所以我不能使用"f2"- 索引列也作为排序。

我是否必须添加时间戳才能执行此操作?批处理中是否有必要,它会减慢计算速度吗?解决这个问题的最佳方法是什么?

更新: 我尝试在表格 API 中使用滑动窗口,但它得到了异常。

// Calculate mean value in each group
    Table groupedTable = table
            .groupBy("f0")
            .select("f0.cast(LONG) as groupNum, f1.avg as avg")
            .orderBy("groupNum");

//Calculate moving range of group Mean using sliding window
    Table movingRangeTable = groupedTable
            .window(Slide.over("2.rows").every("1.rows").on("groupNum").as("w"))
            .groupBy("w")
            .select("groupNum.max as groupNumB, (avg.max - avg.min) as MR");

例外是:

线程“主”java.lang.UnsupportedOperationException 中的异常:当前不支持在事件时间计算滑动组窗口。

在 org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.createEventTimeSlidingWindowDataSet(DataSetWindowAggregate.scala:456)

在 org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:139)

...

这是否意味着 Table API 不支持滑动窗口?如果我没记错的话,DataSet API 中没有窗口函数。那么如何在批处理中计算移动范围?

4

1 回答 1

0

window子句用于定义基于窗口函数的分组,例如Tumbleor Session。除非您指定行的顺序,否则在 Table API(或 SQL)中没有很好地定义每 5 行分组。这是在函数的on子句中完成的Tumble。由于此功能源自流处理,因此该on子句需要时间戳属性。

currentTimestamp()您可以使用该函数获取当前时间的时间戳。但是,我应该指出,Flink 会对数据进行排序,因为它不知道函数的单调性。此外,所有这些都将具有 1 的并行度,因为没有允许分区的子句。

或者,您还可以实现一个用户定义的标量函数,将索引属性转换为时间戳(实际上是 Long 值)。但同样,Flink 会对数据进行完整分类。

于 2018-06-20T09:37:18.867 回答