158

RDD mapmapPartitions方法有什么区别?并且flatMap表现得像map还是像mapPartitions?谢谢。

(编辑)即两者之间有什么区别(在语义上或在执行方面)

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

和:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
4

4 回答 4

164

小鬼。提示 :

每当您有重量级初始化应该为多个RDD元素执行一次而不是每个RDD元素一次,并且如果此初始化(例如从第三方库创建对象)无法序列化(以便 Spark 可以将其跨集群传输到工作节点),使用mapPartitions()而不是 map(). mapPartitions()规定每个工作任务/线程/分区执行一次初始化,而不是每个RDD数据元素执行一次,例如:见下文。

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2。表现得像地图还是flatMapmapPartitions

是的。请参阅flatmap.. 的示例 2,其不言自明。

Q1。RDD和RDD有什么区别mapmapPartitions

map在每个元素级别 mapPartitions执行正在使用的功能,同时在分区级别执行功能。

示例场景 如果我们在特定RDD分区中有 100K 元素,那么我们将在使用 100K 次时触发映射转换正在使用的函数map

相反,如果我们使用mapPartitionsthen 我们只会调用特定函数一次,但我们将传入所有 100K 记录并在一次函数调用中取回所有响应。

由于在特定函数上工作了很多次,因此性能会有所提高map,特别是如果函数每次都在做一些昂贵的事情,而如果我们一次传入所有元素(在这种情况下)它就不需要做mappartitions

地图

对 RDD 的每一项应用一个转换函数,并将结果作为一个新的 RDD 返回。

列出变体

def map[U: ClassTag](f: T => U): RDD[U]

例子 :

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

地图分区

这是一个专门的映射,每个分区只调用一次。各个分区的全部内容可通过输入参数 (Iterarator[T]) 作为顺序值流获得。自定义函数必须返回另一个 Iterator[U]。组合的结果迭代器会自动转换为新的 RDD。请注意,由于我们选择的分区,以下结果中缺少元组 (3,4) 和 (6,7)。

preservesPartitioning指示输入函数是否保留分区器,false除非这是对 RDD 并且输入函数不修改键,否则应该保留。

列出变体

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

示例 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

示例 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

上面的程序也可以使用 flatMap 编写如下。

使用平面图的示例 2

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

结论 :

mapPartitions转换比map因为它调用你的函数一次/分区,而不是一次/元素..

延伸阅读:foreach Vs foreachPartitions 什么时候用呢?

于 2016-08-29T10:17:17.143 回答
130

RDD 的 map 和 mapPartitions 方法有什么区别?

map方法通过应用函数将源 RDD 的每个元素转换为结果 RDD 的单个元素。mapPartitions将源 RDD 的每个分区转换为结果的多个元素(可能没有)。

flatMap 的行为是像 map 还是像 mapPartitions?

两者都不是,flatMap适用于单个元素 (as map) 并产生结果的多个元素 (as mapPartitions)。

于 2014-01-17T19:46:35.897 回答
15

地图

  1. 它一次处理一行,非常类似于 MapReduce 的 map() 方法。
  2. 您在每一行之后从转换中返回。

地图分区

  1. 它一次性处理完整的分区。
  2. 处理整个分区后,您只能从函数返回一次。
  3. 所有中间结果都需要保存在内存中,直到您处理整个分区。
  4. 提供你喜欢的 MapReduce 的 setup() map() 和 cleanup() 函数

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

于 2017-03-13T00:09:25.737 回答
0

地图:

地图变换。

地图一次只在一行上工作。

Map 在每个输入行之后返回。

该映射不将输出结果保存在内存中。

地图没有办法弄清楚然后终止服务。

// map example

val dfList = (1 to 100) toList

val df = dfList.toDF()

val dfInt = df.map(x => x.getInt(0)+2)

display(dfInt)

地图分区:

MapPartition 转换。

MapPartition 一次在一个分区上工作。

MapPartition 在处理完分区中的所有行后返回。

MapPartition 输出保留在内存中,因为它可以在处理特定分区中的所有行后返回。

MapPartition 服务可以在返回之前关闭。

// MapPartition example

Val dfList = (1 to 100) toList

Val df = dfList.toDF()

Val df1 = df.repartition(4).rdd.mapPartition((int) => Iterator(itr.length))

Df1.collec()

//display(df1.collect())

有关详细信息,请参阅Spark 映射与 mapPartitions 转换文章。

希望这有帮助!

于 2021-02-17T06:19:21.160 回答