我正在尝试使用 Clojure 和 Flambo 使用 Spark/ GraphX。
这是我最终得到的代码:
在project.clj
文件中:
(defproject spark-tests "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.6.0"]
[yieldbot/flambo "0.5.0"]]
:main ^:skip-aot spark-tests.core
:target-path "target/%s"
:checksum :warn
:profiles {:dev {:aot [flambo.function]}
:uberjar {:aot :all}
:provided {:dependencies
[[org.apache.spark/spark-core_2.10 "1.3.0"]
[org.apache.spark/spark-core_2.10 "1.2.0"]
[org.apache.spark/spark-graphx_2.10 "1.2.0"]]}})
然后是我的 Clojurecore.clj
文件:
(ns spark-tests.core
(:require [flambo.conf :as conf]
[flambo.api :as f]
[flambo.tuple :as ft])
(:import (org.apache.spark.graphx Edge)
(org.apache.spark.graphx.impl GraphImpl)))
(defonce c (-> (conf/spark-conf)
(conf/master "local")
(conf/app-name "flame_princess")))
(defonce sc (f/spark-context c))
(def users (f/parallelize sc [(ft/tuple 3 ["rxin" "student"])
(ft/tuple 7 ["jgonzal" "postdoc"])
(ft/tuple 5 ["franklin" "prof"])]))
(defn edge
[source dest attr]
(new Edge (long source) (long dest) attr))
(def relationships (f/parallelize sc [(edge 3 7 "collab")
(edge 5 3 "advisor")]))
(def g (new GraphImpl users relationships))
当我运行该代码时,我收到以下错误:
1. Caused by java.lang.ClassCastException
Cannot cast org.apache.spark.api.java.JavaRDD to
scala.reflect.ClassTag
Class.java: 3258 java.lang.Class/cast
Reflector.java: 427 clojure.lang.Reflector/boxArg
Reflector.java: 460 clojure.lang.Reflector/boxArgs
免责声明:我没有 Scala 知识。
然后我想可能是因为Flambo
我们使用时返回了一个JavaRDD f/parallelize
。然后我尝试将 JavaRDD 转换为 GraphX 示例中使用的简单 RDD:
(def g (new GraphImpl (.rdd users) (.rdd relationships)))
但是我得到了同样的错误,但是对于ParallelCollectionRDD
班级......
从那里,我知道可能导致这种情况的原因。Graph 类的Java API 在这里,同一类的 Scala API 在这里。
我不清楚的是如何在 Clojure 中有效地使用该类签名:
org.apache.spark.graphx.Graph<VD,ED>
(Graph是一个抽象类,但我在这个例子中尝试使用GraphImpl)
我想做的是使用 Clojure重新创建那个 Scala 示例。
任何提示将不胜感激!