我有一个用 Spark GraphX (Scala 2.10) 和其他 Spark 库编写的脚本,用于处理 Wikipedia 转储的 PageRank 分数并检索最高结果。我可以通过将脚本放在示例文件夹中并插入所需的依赖项来让脚本在本地运行。但我找不到将其编译为应用程序以便在 Amazon EC2 上运行的方法。
我努力了:
- 为整个 Spark 项目编译 fat jar,将我的脚本作为一个新类,并从示例运行该类:org.apache.spark.examples.graphx.PageRankGraphX(非常“迂回”的成功编译路径和最终失败集群 - SparkException:应用程序以失败状态完成)。这是我在本地设置中最成功的方法,但我认为我的方法不是标准的。
- 将脚本添加到源代码中的另一个库,例如 GraphX(这使得无法编译 - 找不到许多库项,例如:java.lang.NoClassDefFoundError: com/google/common/util/concurrent/ThreadFactoryBuilder)
- 在没有 Spark 源的情况下构建一个全新的项目,仅在 sbt 文件中添加我的脚本作为源代码以及 Spark 依赖项(例如 Spark 核心、Spark 流、Spark GraphX 等) - 这可以编译,但是当我尝试运行应用程序在运行时失败,因为它缺少几乎所有依赖项的代码(例如 ClassDefNotFoundException:Scala/Serializer)。
我希望有人有一些易于遵循的说明,说明如何运行一个用 GraphX 编写的独立 jar,它将在 Amazon EMR 上处理。我的整个脚本发布在下面 - 如果这是你的,你会怎么做才能让它在 AWS 上运行并生成一个输出文件?我认为这里的任何细节都不太基本:
- 使用我的脚本作为其中的新类构建整个下载的 Spark 项目的正确方法是否正确?如果是这样,在 Spark 源代码中,脚本属于哪个文件夹?
- 还是在全新项目中构建它的正确方法?如果是这样,如何确保 sbt 包含运行时可能需要的每个依赖项,以便应用程序正确运行?
- 否则,如果这些都不正确,我应该如何处理我的脚本才能在 Amazon WS EMR 上成功运行它?我之前在 MR 中运行过集群项目,没有遇到这种麻烦。
我正在尝试运行的脚本如下 - 我可以肯定地确认,当它的所有依赖项都正确编译时,它可以正常工作 - 但是,我的编译版本在 EMR 中的实际工作中仍然失败。
package org.apache.spark.graphx
import java.io._
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.lib.PageRank
import org.apache.spark.rdd.RDD
object PageRankGraph extends Serializable {
def hashId(str: String): Long = {
val bytes = MessageDigest.getInstance("MD5").digest(str.getBytes(StandardCharsets.UTF_8))
(bytes(0) & 0xFFL) |
((bytes(1) & 0xFFL) << 8) |
((bytes(2) & 0xFFL) << 16) |
((bytes(3) & 0xFFL) << 24) |
((bytes(4) & 0xFFL) << 32) |
((bytes(5) & 0xFFL) << 40) |
((bytes(6) & 0xFFL) << 48) |
((bytes(7) & 0xFFL) << 56)
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("WikipediaGraphXPageRank")
.setMaster(args(5))
.set("spark.executor.memory","1g")
val sc = new SparkContext(sparkConf)
val topics: RDD[String] = (sc.textFile(args(0))
.map(line => line.split("\t")).map(parts => (parts.head)))
val vertices = topics map (topic => hashId(topic) -> topic)
val uniqueHashes = vertices.map(_._1).countByValue()
val uniqueTopics = vertices.map(_._2).countByValue()
uniqueHashes.size == uniqueTopics.size
val linksall = (sc.textFile(args(0))).map(l => l.split("\t"))
val links = for (l <- linksall; l2 <- l(2).split(",")) yield (l(0), l2)
val edges = for (l <- links) yield Edge(hashId(l._1), hashId(l._2), 0)
val graph = Graph(vertices, edges, "").cache()
graph.vertices.count
if (args(4).toInt == 1) graph.partitionBy(PartitionStrategy.RandomVertexCut)
else if (args(4).toInt == 2) graph.partitionBy(PartitionStrategy.EdgePartition2D)
else if (args(4).toInt == 3) graph.partitionBy(PartitionStrategy.CanonicalRandomVertexCut)
else graph.partitionBy(PartitionStrategy.EdgePartition1D)
var prGraph = PageRank.run(graph, args(2).toInt, args(3).toDouble)
val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {
(v, title, rank) => (rank.getOrElse(0.0), title)
}
val pw = new PrintWriter(new File(args(1)))
titleAndPrGraph.vertices.top(100) {
Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
}.foreach(t =>
if(t._2._2 != "") {
pw.write("\nTitle: " + t._2._2 + " : " + t._2._1 + "\n")
})
pw.close()
}
}
任何你能对此有所了解的人都会非常有帮助。就像我说的那样,我发现我读过的几乎所有教程和指南都缺乏详细程度,所以你能负担得起的越多越好(PS 我已经在 Windows 机器上尝试了以上所有方法)。