2

我正在按照https://github.com/snowplow/snowplow-scala-analytics-sdk上的指南使用 Spark 解析扫雪机的事件。

我的代码看起来像

import com.snowplowanalytics.snowplow.analytics.scalasdk.json.EventTransformer
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.SparkContext._;
import java.util.Calendar
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import java.text.SimpleDateFormat;
import java.io.{ InputStream, Serializable };
import java.net.URI;
import java.util.Properties;
import org.apache.spark.sql.SQLContext
import java.util.Date
import org.apache.spark.sql.SQLContext._
import org.apache.spark.sql.SparkSession
import scalaz._
import scalaz.Scalaz._

object NcgGraphx {
    def main(args: Array[String]) {

        if (args.size < 1) {
             println("Please supply input file path")
             System.exit(1)
        }
        val file = args(0)
        val conf = new SparkConf().setAppName("NcgGraphx").setMaster("local")
        println("created conf object")
        val ctx = new SparkContext(conf)
        println("created spark context")
        val spark = SparkSession.builder().appName("NcgGraphX").getOrCreate
        val sqlContext = new SQLContext(ctx)
        import sqlContext.implicits._
        import spark.implicits._

        println("loading json file")
        val input = ctx.textFile(file)
        println("after loading file")
        input.take(1).foreach(println(_))
        val events = input.map(line => EventTransformer.transform(line)).flatMap (_.toOption)
        println("after transforming events")
        events.take(1).foreach(println(_))
//        val dataframe = spark.read.json(events)
//        dataframe.show() 
}
}

我的 build.sbt 是这样的

name := "ncgaka-graphx"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.0.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "2.0.0",
  "org.apache.spark" %% "spark-hive" % "2.0.0",
  "org.apache.spark" %% "spark-graphx" % "2.0.0",
  "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % "0.1.0",
  "org.scalaz" %% "scalaz-core" % "7.2.5",
  "org.scalaz" %% "scalaz-concurrent" % "7.2.5"
)

resolvers ++= Seq("Snowplow Analytics" at "http://maven.snplow.com/releases/")

当我尝试运行 JAR 时,我得到以下运行时异常。

Exception in thread "main" java.lang.NoClassDefFoundError: scalaz/Validation
    at NcgGraphx$.main(NcgGraphx.scala:42)
    at NcgGraphx.main(NcgGraphx.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: scalaz.Validation
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

知道我错过了什么吗?

4

1 回答 1

0

问题是您使用的是 scalaz-7.2.5,它与scala-analytics-sdk中使用的版本二进制不兼容。

如果您需要二进制兼容且可用于 Scala 2.11 的版本,您可以使用 7.0.9:

"org.scalaz" %% "scalaz-core" % "7.0.9"
于 2016-12-08T05:43:53.157 回答