小鬼。提示 :
每当您有重量级初始化应该为多个RDD
元素执行一次而不是每个RDD
元素一次,并且如果此初始化(例如从第三方库创建对象)无法序列化(以便 Spark 可以将其跨集群传输到工作节点),使用mapPartitions()
而不是
map()
. mapPartitions()
规定每个工作任务/线程/分区执行一次初始化,而不是每个RDD
数据元素执行一次,例如:见下文。
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
Q2。表现得像地图还是像?flatMap
mapPartitions
是的。请参阅flatmap
.. 的示例 2,其不言自明。
Q1。RDD和RDD有什么区别map
mapPartitions
map
在每个元素级别
mapPartitions
执行正在使用的功能,同时在分区级别执行功能。
示例场景: 如果我们在特定RDD
分区中有 100K 元素,那么我们将在使用 100K 次时触发映射转换正在使用的函数map
。
相反,如果我们使用mapPartitions
then 我们只会调用特定函数一次,但我们将传入所有 100K 记录并在一次函数调用中取回所有响应。
由于在特定函数上工作了很多次,因此性能会有所提高map
,特别是如果函数每次都在做一些昂贵的事情,而如果我们一次传入所有元素(在这种情况下)它就不需要做mappartitions
。
地图
对 RDD 的每一项应用一个转换函数,并将结果作为一个新的 RDD 返回。
列出变体
def map[U: ClassTag](f: T => U): RDD[U]
例子 :
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
地图分区
这是一个专门的映射,每个分区只调用一次。各个分区的全部内容可通过输入参数 (Iterarator[T]) 作为顺序值流获得。自定义函数必须返回另一个 Iterator[U]。组合的结果迭代器会自动转换为新的 RDD。请注意,由于我们选择的分区,以下结果中缺少元组 (3,4) 和 (6,7)。
preservesPartitioning
指示输入函数是否保留分区器,false
除非这是对 RDD 并且输入函数不修改键,否则应该保留。
列出变体
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
示例 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
示例 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
上面的程序也可以使用 flatMap 编写如下。
使用平面图的示例 2
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
结论 :
mapPartitions
转换比map
因为它调用你的函数一次/分区,而不是一次/元素..
延伸阅读:foreach Vs foreachPartitions 什么时候用呢?