0

我在与其他用户共享的集群上使用 Spark。因此,仅根据运行时间来判断我的哪个代码运行效率更高是不可靠的。因为当我运行更高效的代码时,其他人可能会运行大量数据并使我的代码执行更长时间。

所以我可以在这里问两个问题:

  1. 我正在使用join函数加入 2RDDs并且在使用groupByKey()之前尝试使用join,如下所示:

    rdd1.groupByKey().join(rdd2)
    

    似乎花了更长的时间,但是我记得当我使用 Hadoop Hive 时,group by 使我的查询运行得更快。由于 Spark 使用延迟评估,我想知道groupByKeybefore是否join让事情变得更快

  2. 注意到Spark有一个SQL模块,到现在还真没时间去尝试,请问SQL模块和RDD SQL之类的函数有什么区别?

4

2 回答 2

5
  1. 没有充分的理由groupByKey跟随joinjoin单独更快。如果rdd1并且rdd2没有分区器或分区器不同,那么限制因素就是HashPartitioning.

    通过使用groupByKey,您不仅通过保持分组所需的可变缓冲区来增加总成本,而且更重要的是,您使用了额外的转换,这会导致更复杂的 DAG。groupByKey+ join:

    rdd1 = sc.parallelize([("a", 1), ("a", 3), ("b", 2)])
    rdd2 = sc.parallelize([("a", 5), ("c", 6), ("b", 7)])
    rdd1.groupByKey().join(rdd2)
    

    在此处输入图像描述

    join单独相比:

    rdd1.join(rdd2)
    

    在此处输入图像描述

    最后,这两个计划甚至不是等效的,要获得相同的结果,您必须在第一个计划的基础上再添加flatMap一个。

  2. 这是一个相当广泛的问题,但要强调主要区别:

    • PairwiseRDDs是任意Tuple2元素的同质集合。对于默认操作,您希望 key 以有意义的方式可散列,否则对类型没有严格的要求。相比之下,DataFrame 表现出更多的动态类型,但每列只能包含来自一组受支持的已定义类型的值。可以定义UDT,但仍然必须使用基本的 UDT 来表示。

    • DataFrames 使用Catalyst Optimizer,它生成逻辑和物理执行计划,并且可以生成高度优化的查询,而无需应用手动低级优化。基于 RDD 的操作简单地遵循依赖 DAG。这意味着在没有自定义优化的情况下性能更差,但对执行的控制要好得多,并且有一些微调的潜力。

其他一些要阅读的东西:

于 2015-10-26T11:06:14.243 回答
4

我大多同意 zero323 的回答,但我认为有理由期望. 减少数据量,按key对数据进行分区。这两者都有助于后续.joingroupByKeygroupByKeyjoin

我认为前者(减少数据大小)并不重要。为了获得后者(分区)的好处,您需要以相同的方式对其他 RDD 进行分区。

例如:

val a = sc.parallelize((1 to 10).map(_ -> 100)).groupByKey()
val b = sc.parallelize((1 to 10).map(_ -> 100)).partitionBy(a.partitioner.get)
a.join(b).collect

DAG 可视化

于 2015-10-26T12:54:48.200 回答