这里有个需求:数据集太大,我们需要对数据进行分区,在每个分区计算一个本地结果,然后合并。例如,如果有 100 万条数据,分成 100 个分区,那么每个副本将只有大约 10000 条数据。由于需要使用分区数进行调优,因此要求分区数是可变的。另外,一个分区的所有数据都必须批量计算,不能一一计算。
实现如下:在分区阶段之后,每条数据都会有一个key来表示它所属的分区。现在,数据应该是这样的:afterPartitionedData=[(0,data1),(0,data2)…(1,data3),(1,data4),…,(99,datan)]
。 接下来,使用 Flink 的partitionCustom
和mapPartition
运算符。
dataSet = env. fromCollection(afterPartitionedData)
dataset
.partitionCustom(new myPartitioner(),0)
.mapPartition(new myMapPartitionFunction[(Int,String),Int]())
…
…
class myPartitioner extends Partitioner[Int]{
override def partition(key: Int, numPartitions: Int) = {
println("numPartitions="+numPartitions) // 6 , CPU number
key // just return the partitionID
}
}
但是,却报错:
...
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:226)
...
这似乎是因为 Flink 默认的分区DataSet
数是 CPU 的个数,在我的电脑上是 6 个,所以会报错 java.lang.ArrayIndexOutOfBoundsException : 6
。
所以我的问题是:有没有办法随意改变分区的数量?Partition (key: int, numpartitions: int)
我在 API Partitioner的方法中找到了这个参数,但不知道如何更改它。
有没有办法改变DataSet
分区的数量?
Flink 版本为 1.6,测试代码为:
object SimpleFlinkFromBlog {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val afterPartitionedData = new mutable.MutableList[(Int, String)]
afterPartitionedData.+=((0, "0"))
afterPartitionedData.+=((1, "1"))
afterPartitionedData.+=((2, "2"))
afterPartitionedData.+=((2, "2"))
afterPartitionedData.+=((3, "3"))
afterPartitionedData.+=((3, "3"))
afterPartitionedData.+=((3, "3"))
afterPartitionedData.+=((4, "4"))
afterPartitionedData.+=((5, "5"))
afterPartitionedData.+=((5, "5"))
afterPartitionedData.+=((5, "5"))
// Comment this line will not report an error.
// java.lang.ArrayIndexOutOfBoundsException : 6
afterPartitionedData.+=((6, "will wrong"))
val dataSet = env.fromCollection( afterPartitionedData )
val localRes = dataSet
.partitionCustom(new myPartitioner(),0)
.mapPartition(new MapPartitionFunction[(Int,String),Int] {
override def mapPartition(values: lang.Iterable[(Int, String)], out: Collector[Int]) = {
var count = 0;
values.forEach(new Consumer[(Int, String)] {
override def accept(t: (Int, String)): Unit = {
count=count+1;
print("current count is " + count + " tuple is " + t + "\n");
}
})
out.collect(count)
}
})
localRes.collect().foreach(println)
}
class myPartitioner extends Partitioner[Int]{
override def partition(key: Int, numPartitions: Int) = {
// println("numPartitions="+numPartitions)
key
}
}
}
谢谢!