1

我有一个链接:JdbcRDD[String],其中包含以下形式的链接:

{"bob,michael"} 

分别为每个链接的源和目标。我可以拆分每个字符串以检索唯一标识源节点和目标节点的字符串。然后我有一个 users:RDD[(Long, Vertex)] ,它包含我图中的所有顶点。每个顶点都有一个 nameId:String 属性和一个 nodeId:Long 属性。

我想从 stringId 中检索 nodeId,但不知道如何实现这个逻辑,在 Scala 和 Spark 中都是相当新的。我被这段代码困住了:

val reflinks = links.map { x =>
    // split each line in an array
    val row = x.split(',')
    // retrieve the id using the row(0) and row(1) values
    val source = users.filter(_._2.stringId == row(0)).collect()
    val dest = users.filter(_._2.stringId == row(1)).collect()
    // return last value
    Edge(source(0)._1, dest(0)._1, "referral")
    // return the link in Graphx format
    Edge(ids(0), ids(1), "ref")
}

有了这个解决方案,我得到:

 org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
4

1 回答 1

0

不幸的是,您不能在 Spark 中嵌套 RDD。也就是说,当您在闭包内发送到另一个 RDD 时,您无法访问一个 RDD。

如果您想将来自多个 RDD 的知识结合起来,您需要以join某种方式将它们结合起来。这是解决此问题的一种方法:

import org.apache.spark.graphx._
import org.apache.spark.SparkContext._

// These are some toy examples of the original data for the edges and the vertices
val rawEdges = sc.parallelize(Array("m,a", "c,a", "g,c"))
val rawNodes = sc.parallelize(Array( ("m", 1L), ("a", 2L), ("c", 3L), ("g", 4L)))

val parsedEdges: RDD[(String, String)] = rawEdges.map(x => x.split(",")).map{ case Array(x,y) => (x,y) }

// The two joins here are required since we need to get the ID for both nodes of each edge
// If you want to stay in the RDD domain, you need to do this double join.
val resolvedFirstRdd = parsedEdges.join(rawNodes).map{case (firstTxt,(secondTxt,firstId)) => (secondTxt,firstId) }
val edgeRdd = resolvedFirstRdd.join(rawNodes).map{case (firstTxt,(firstId,secondId)) => Edge(firstId,secondId, "ref") }

// The prints() are here for testing (they can be expensive to keep in the actual code)
edgeRdd.foreach(println)
val g = Graph(rawNodes.map(x => (x._2, x._1)), edgeRdd)

println("In degrees")
g.inDegrees.foreach(println) 
println("Out degrees")
g.outDegrees.foreach(println) 

用于测试的打印输出:

Edge(3,2,ref)
Edge(1,2,ref)
Edge(4,3,ref)
In degrees
(3,1)
(2,2)
Out degrees
(3,1)
(1,1)
(4,1)
于 2015-11-29T03:26:28.063 回答