0

我正在做一个包含大数据的程序,这就是我使用 Spark 和 Scala 的原因。我需要对数据库进行分区,为此我使用

var data0 = conf.dataBase.repartition (8) .persist (StorageLevel.MEMORY_AND_DISK_SER)

但是然后我需要在分区中做一些事情,然后再继续使用与该分区对应的数据库,为此我使用

var tester = data0.mapPartitions {x =>
   configFuzzyPredProblem ()
   Strategy.getStrategy.executeStrategy (conf.iterByRun, 5, GeneratorType.HillClimbing)
 } .persist (StorageLevel.MEMORY_AND_DISK_SER)

executeStrategy()我使用数据库的方法中,但我不知道它是全局数据库还是与该分区对应的数据库。如何知道我使用的是哪一个,然后只使用该分区的数据库执行分区处理?

4

1 回答 1

2

这是一个使用 mapPartitionsWithIndex 的简单示例,它遵循与 mapPartitions 相同的规则——不包括索引方面。

您可以看到在 mapPartitions 中您需要处理一个可交互对象,在本示例中是一个 Interator Int。在这种情况下,处理了 3 个分区,在您的情况下为 8,其中包含一些条目或可能为零的条目。

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
    iter.map(x => index + "," + x)
}
val rdd2 = rdd1.mapPartitionsWithIndex(myfunc)

我看不到你的函数内部,但我认为它没问题,它将处理一个分区 - 你的数据库的一部分。

于 2018-12-04T19:36:57.623 回答