0

我正在使用来自 Python 和 R 的 Spark/GraphFrames。当我在 Python 的小图上调用 PageRank 时,它比使用 R 慢很多。考虑到 Python 和 R 都在调用相同的图书馆?

我将尝试在下面演示这个问题。

Spark/GraphFrames 包括图的示例,例如朋友如此链接中所述。这是一个非常小的有向图,有 6 个节点和 8 条边(请注意,该示例与其他版本的 GraphFrames 相比并不相同)。

在此处输入图像描述

当我用 R 运行以下代码时,几乎不需要时间来计算 PageRank:

library(graphframes)
library(sparklyr)
library(dplyr)

nodes <- read.csv('nodes.csv')
edges <- read.csv('edges.csv')

sc <- spark_connect(master = "local", version = "2.1.1")

nodes_tbl <- copy_to(sc, nodes)
edges_tbl <- copy_to(sc, edges)

graph <- gf_graphframe(nodes_tbl, edges_tbl)
ranks <- gf_pagerank(graph, reset_probability = 0.15, tol = 0.01)
print(ranks$vertices)

results <- as.data.frame(ranks$vertices)
results <- arrange(results, id)
results$pagerank <- results$pagerank / sum(results$pagerank)

print(results)

当我使用 PySpark 运行等效程序时,需要 10 到 30 分钟:

from pyspark.sql import SparkSession
from graphframes.examples import Graphs

if __name__ == '__main__':

    sc = SparkSession.builder.master("local").getOrCreate()
    g = Graphs(sc).friends()
    results = g.pageRank(resetProbability=0.15, tol=0.01)
    results.vertices.select("id", "pagerank").show()
    results.edges.select("src", "dst", "weight").show()

我尝试了不同版本的 Spark 和 GraphFrames for Python 以与 R 的设置保持一致。

4

1 回答 1

3

一般来说,当您看到在不同后端中明显等效的代码片段之间存在如此显着的运行时差异时,您必须考虑两种可能性:

  • 没有真正的等价物。尽管在底层使用了相同的 Java 库,但不同语言用于与 JVM 交互的路径并不相同,并且当代码到达 JVM 时,它可能不会使用相同的调用链。
  • 这些方法是等效的,但配置和/或数据分布不一样。

在这种特殊情况下,第一个也是最明显的原因是您如何加载数据。

但是,据我所知,在这种特殊情况下,这些选项不应该影响运行时。此外,在这两种情况下,代码到达 JVM 后端之前的路径似乎并没有足以解释差异。

这表明问题出在配置的某个地方。一般来说,至少有两个选项可以显着影响数据分布,从而影响执行时间:

  • spark.default.parallelism- 与 RDD API 一起用于确定不同情况下的分区数量,包括默认的后随机分配。有关可能的影响,请参见例如Spark 迭代时间在使用 join 时呈指数增长

    看起来它不会影响您的代码。

  • spark.sql.shuffle.partitions- 与DatasetAPI 一起用于确定随机播放后的分区数(groupByjoin等)。

    虽然 PageRank 代码使用旧的 GraphX API,并且此参数在此不直接适用,但在将数据传递给旧 API 之前,涉及使用API索引边和顶点Dataset

    如果您检查源代码,您会看到两者都indexedEdges使用indexVertices连接,因此依赖于spark.sql.shuffle.partitions.

    此外,上述方法设置的分区数量将由 GraphXGraph对象继承,显着影响执行时间。

    如果设置spark.sql.shuffle.partitions为最小值:

    spark: SparkSession
    spark.conf.set("spark.sql.shuffle.partitions", 1)
    

    这种小数据的执行时间应该可以忽略不计。

结论

您的环境可能使用不同的spark.sql.shuffle.partitions.

一般方向

如果您看到这样的行为,并且想粗略地缩小问题范围,您应该查看 Spark UI,看看哪里有分歧。在这种情况下,您可能会看到明显不同数量的任务。

于 2018-10-06T11:43:01.970 回答