1

我正在尝试使用 Clojure 和 Flambo 使用 Spark/ GraphX

这是我最终得到的代码:

project.clj文件中:

(defproject spark-tests "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.6.0"]
                 [yieldbot/flambo "0.5.0"]]
  :main ^:skip-aot spark-tests.core
  :target-path "target/%s"
  :checksum :warn
  :profiles {:dev {:aot [flambo.function]}
             :uberjar {:aot :all}
             :provided {:dependencies
                        [[org.apache.spark/spark-core_2.10 "1.3.0"]
                         [org.apache.spark/spark-core_2.10 "1.2.0"]
                         [org.apache.spark/spark-graphx_2.10 "1.2.0"]]}})

然后是我的 Clojurecore.clj文件:

(ns spark-tests.core  
  (:require [flambo.conf :as conf]
            [flambo.api :as f]
            [flambo.tuple :as ft])
  (:import (org.apache.spark.graphx Edge)
           (org.apache.spark.graphx.impl GraphImpl)))

(defonce c (-> (conf/spark-conf)
               (conf/master "local")
               (conf/app-name "flame_princess")))

(defonce sc (f/spark-context c))

(def users (f/parallelize sc [(ft/tuple 3 ["rxin" "student"])
                              (ft/tuple 7 ["jgonzal" "postdoc"])
                              (ft/tuple 5 ["franklin" "prof"])]))

(defn edge
  [source dest attr]
  (new Edge (long source) (long dest) attr))

(def relationships (f/parallelize sc [(edge 3 7 "collab")
                                      (edge 5 3 "advisor")]))

(def g (new GraphImpl users relationships))

当我运行该代码时,我收到以下错误:

1. Caused by java.lang.ClassCastException
   Cannot cast org.apache.spark.api.java.JavaRDD to
   scala.reflect.ClassTag

  Class.java: 3258  java.lang.Class/cast
  Reflector.java:  427  clojure.lang.Reflector/boxArg
  Reflector.java:  460  clojure.lang.Reflector/boxArgs

免责声明:我没有 Scala 知识。

然后我想可能是因为Flambo我们使用时返回了一个JavaRDD f/parallelize。然后我尝试将 JavaRDD 转换为 GraphX 示例中使用的简单 RDD:

(def g (new GraphImpl (.rdd users) (.rdd relationships)))

但是我得到了同样的错误,但是对于ParallelCollectionRDD班级......

从那里,我知道可能导致这种情况的原因。Graph 类的Java API 在这里,同一类的 Scala API 在这里

我不清楚的是如何在 Clojure 中有效地使用该类签名:

org.apache.spark.graphx.Graph<VD,ED>

(Graph是一个抽象类,但我在这个例子中尝试使用GraphImpl)

我想做的是使用 Clojure重新创建那个 Scala 示例。

任何提示将不胜感激!

4

1 回答 1

1

终于做对了(我认为)。这是似乎正在工作的代码:

(ns spark-tests.core
  (:require [flambo.conf :as conf]
            [flambo.api :as f]
            [flambo.tuple :as ft])
  (:import (org.apache.spark.graphx Edge
                                    Graph)
           (org.apache.spark.api.java JavaRDD
                                      StorageLevels)
           (scala.reflect ClassTag$)))

(defonce c (-> (conf/spark-conf)
               (conf/master "local")
               (conf/app-name "flame_princess")))

(defonce sc (f/spark-context c))

(def users (f/parallelize sc [(ft/tuple 3 ["rxin" "student"])
                              (ft/tuple 7 ["jgonzal" "postdoc"])
                              (ft/tuple 5 ["franklin" "prof"])]))

(defn edge
  [source dest attr]
  (new Edge (long source) (long dest) attr))

(def relationships (f/parallelize sc [(edge 3 7 "collab")
                                      (edge 5 3 "advisor")
                                      (edge 7 3 "advisor")]))


(def g (Graph/apply (.rdd users)
                    (.rdd relationships)
                    "collab"
                    (StorageLevels/MEMORY_ONLY)
                    (StorageLevels/MEMORY_ONLY)
                    (.apply ClassTag$/MODULE$ clojure.lang.PersistentVector)
                    (.apply ClassTag$/MODULE$ java.lang.String)))

(println (.count (.edges g)))

此代码返回的3内容似乎是准确的。主要问题是我没有使用 Graph/Apply. 事实上,这似乎是创建所有对象的方式(看起来是构造函数......)。我不知道为什么会这样,但这可能是由于我缺乏 Scala 知识。如果有人知道,请告诉我为什么:)

之后,我只需要填写apply函数签名的空白即可。

需要注意的一点是最后两个参数:

  • scala.reflect.ClassTag<VD> evidence$17
  • scala.reflect.ClassTag<ED> evidence$18

这用于指示vertex attribute type( VD ) 和edge attribute type( ED ) 的 Scala。的类型ED是我用作类的第三个参数的对象的Edge类型。那么type ofVD就是tuple函数第二个参数的类型。

于 2015-05-22T17:30:06.283 回答