我想实现一个具有以下访问模式的算法(类似于有限差分算法):
在此示例中,dataset_1 的第一个值用于计算 dataset_2 的第一个和第二个值。所以,对于这个值,我应该有 2 个不同的键。因此,必须多次读取dataset_1 的某些值(2 或 3 次)。
我想我必须使用groupBy(key).reduce(Algorithm)
转换,但我不知道如何定义键。
我想实现一个具有以下访问模式的算法(类似于有限差分算法):
在此示例中,dataset_1 的第一个值用于计算 dataset_2 的第一个和第二个值。所以,对于这个值,我应该有 2 个不同的键。因此,必须多次读取dataset_1 的某些值(2 或 3 次)。
我想我必须使用groupBy(key).reduce(Algorithm)
转换,但我不知道如何定义键。
Flink DataSet 没有排序,除非您在单个线程中处理它们,即并行度为 1。您可以为每个数据添加顺序索引并将该索引用作键。
从您的示例中,我假设 dataset_2 的索引为 4 的值是根据 dataset_1 的值 3、4 和 5 计算得出的,即 dataset_2 的每个值都来自 dataset_1 的三个(或两个)值。
有多种方法可以做你想做的事,有些很容易实现,有些更有效。
一种简单的方法是FlatMapFunction
在 dataset_1 上应用 a ,它使用三个键、和发出带有索引的每个值i
三次。之后,您将结果数据集分组到新键上,并使用 GroupReduce 函数来计算新值。这种方法将 dataset_1 的数据量增加了三倍,但可以轻松并行化。i-1
i
i+1
另一种选择是进行手动范围分区,这类似于第一种方法,但更通用一些。我再次假设 dataset_1 的值具有顺序idx
属性。使用 FlatMapFunction 分配partitionIds
值,即,对于 100 个元素的分区大小,执行类似partitionId = idx / 100
. 分区的第一个和最后一个元素需要发出两次。例如idx
,partitionId 1(值 100 到 199)的元素为 100 和 199,需要通过两次发出这些值来分别复制到分区 0 和 2。partitionIds
分配后,您可以groupBy(partitionId)
、sortGroup(idx)
、 和groupReduce
分区的所有元素。分区的大小是可配置的。
这听起来像是一个滑动窗口计算。您应该使用我们DataStream
而不是DataSet
应用大小为 3 和步长为 1 的窗口。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream dataset_2 = env.readTextFile(textPathTo-Dataset_1).window(Count.of(3)).every(Count.of(1)).WINDOW_FUNCTION(...).flatten();
有多个WINDOW_FUNCTION
可用(例如,mxn、min、sum 或通用 mapWindow、foldWindow、reduceWindow)。请查看哪个功能适合您的用例的文档:https ://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html