最近我正在尝试使用 Apache Flink 进行快速批处理。我有一个带有 column:value 和不相关索引列的表
基本上我想计算每 5 行值的平均值和范围。然后我将根据我刚刚计算的平均值计算平均值和标准差。所以我想最好的方法是使用Tumble
window.
看起来像这样
DataSet<Tuple2<Double, Integer>> rawData = {get the source data};
Table table = tableEnvironment.fromDataSet(rawData);
Table groupedTable = table
.window(Tumble.over("5.rows").on({what should I write?}).as("w")
.groupBy("w")
.select("f0.avg, f0.max-f0.min");
{The next step is to use groupedTable to calculate overall mean and stdDev}
但我不知道该写什么.on()
。我试过"proctime"
了,但它说没有这样的输入。我只希望它按从源读取的顺序分组。但它必须是时间属性,所以我不能使用"f2"
- 索引列也作为排序。
我是否必须添加时间戳才能执行此操作?批处理中是否有必要,它会减慢计算速度吗?解决这个问题的最佳方法是什么?
更新: 我尝试在表格 API 中使用滑动窗口,但它得到了异常。
// Calculate mean value in each group
Table groupedTable = table
.groupBy("f0")
.select("f0.cast(LONG) as groupNum, f1.avg as avg")
.orderBy("groupNum");
//Calculate moving range of group Mean using sliding window
Table movingRangeTable = groupedTable
.window(Slide.over("2.rows").every("1.rows").on("groupNum").as("w"))
.groupBy("w")
.select("groupNum.max as groupNumB, (avg.max - avg.min) as MR");
例外是:
线程“主”java.lang.UnsupportedOperationException 中的异常:当前不支持在事件时间计算滑动组窗口。
在 org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.createEventTimeSlidingWindowDataSet(DataSetWindowAggregate.scala:456)
在 org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:139)
...
这是否意味着 Table API 不支持滑动窗口?如果我没记错的话,DataSet API 中没有窗口函数。那么如何在批处理中计算移动范围?