0

我正在使用带有 scala 的 Spark。我想知道单行命令是否比单独的命令更好?如果有的话有什么好处?它是否在速度方面获得了更高的效率?为什么?

例如

var d = data.filter(_(1)==user).map(f => (f(2),f(5).toInt)).groupByKey().map(f=> (f._1,f._2.count(x=>true), f._2.sum))

反对

var a = data.filter(_(1)==user)
var b = a.map(f => (f(2),f(5).toInt))
var c = b.groupByKey()
var d = c.map(f=> (f._1,f._2.count(x=>true), f._2.sum))
4

2 回答 2

5

您的两个示例之间没有性能差异;链接 RDD 转换或显式表示中间 RDD 的决定只是风格问题。take()Spark 的惰性求值意味着在调用类似or的 RDD 操作之前不会执行实际的分布式计算count()

在执行期间,Spark 将流水线化尽可能多的转换。对于您的示例,Spark 在映射之前不会具体化整个过滤数据集:filter()map()转换将一起流水线化并在单个阶段执行。转换(通常)需要通过groupByKey()网络对数据进行混洗,因此它在单独的阶段执行。filter()只有当它是cache()d时,Spark 才会实现输出。

如果要缓存中间 RDD 并对其执行进一步处理,则可能需要使用第二种样式。例如,如果我想对groupByKey()转换的输出执行多个操作,我会写类似

val grouped = data.filter(_(1)==user)
                  .map(f => (f(2),f(5).toInt))
                  .groupByKey()
                  .cache()
val mapped = grouped.map(f=> (f._1,f._2.count(x=>true), f._2.sum))
val counted = grouped.count()
于 2013-10-13T01:25:14.717 回答
0

在执行方面没有区别,但您可能需要考虑代码的可读性。我会用你的第一个例子,但像这样:

var d = data.filter(_(1)==user)
.map(f => (f(2),f(5).toInt))
.groupByKey()
.map(f=> (f._1,f._2.count(x=>true), f._2.sum))

确实,这比 Spark 更像是一个 Scala 问题。尽管如此,从 Spark 的字数统计实现中可以看出,如他们的文档中所示

val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

你不需要担心这些事情。Scala 语言(通过惰性等)和 Spark 的 RDD 实现在更高的抽象级别处理所有这些。

如果您发现性能确实很差,那么您应该花时间探索原因。正如Knuth所说,“过早的优化是万恶之源”。

于 2013-10-13T01:20:39.393 回答