我正在学习 Apache Spark 和 graphframes 试图让 shortestPaths 使用此处提供的代码在我的 6 节点集群上工作https://graphframes.github.io/user-guide.html
import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends // get example graph
val results = g.shortestPaths.landmarks(Seq("a", "d")).run()
results.select("id", "distances").show()
当在本地运行时,shortestPaths 工作,但是当使用集群运行时,我收到以下警告,然后是一个错误:
WARN TaskSetManager: Lost task 0.0 in stage 12.0 (TID 40, 192.168.0.51, executor 4):
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2127)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2022)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
这是我一直在使用的 spark 提交:
~/spark-2.1.0/bin/spark-submit --class "PSKQ" --master spark://192.168.0.42:7077 target/scala-2.11/pskq_2.11-1.0.jar
我的 SBT 文件
name := "PSKQ"
version := "1.0"
scalaVersion := "2.11.8"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven/"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.0",
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.hadoop" % "hadoop-hdfs" % "2.2.0",
"org.apache.spark" %% "spark-graphx" % "2.1.0",
"graphframes" % "graphframes" % "0.5.0-spark2.1-s_2.11"
)