0

所以我有一个 Spark-Cassandra 集群,我试图在上面执行 sql 查询。我用 sbt 组件构建了一个 jar,然后用 spark-submit 提交它。当我不使用 spark-sql 时,这可以正常工作。当我使用 spark sql 时出现错误,以下是输出:

2
CassandraRow{key: key1, value: 1}
3.0
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.trees.LeafNode$class.children(Lorg/apache/spark/sql/catalyst/trees/LeafNode;)Lscala/collection/Seq;
    at org.apache.spark.sql.cassandra.CassandraTableScan.children(CassandraTableScan.scala:19)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5$$anonfun$apply$6.apply(TreeNode.scala:280)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:279)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenUp(TreeNode.scala:292)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:247)
    at org.apache.spark.sql.execution.AddExchange.apply(Exchange.scala:128)
    at org.apache.spark.sql.execution.AddExchange.apply(Exchange.scala:124)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:1085)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:1085)
    at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:889)
    at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:797)
    at CassSparkTest$.main(CassSparkTest.scala:22)
    at CassSparkTest.main(CassSparkTest.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

这是这项工作的scala代码,非常简单:

import org.apache.spark.{SparkContext, SparkConf}
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.sql._

object CassSparkTest {
        def main(args: Array[String]) {
                val conf = new SparkConf(true)
                        .set("spark.cassandra.connection.host", "127.0.0.1")
                val sc = new SparkContext("spark://192.168.10.11:7077", "test", conf)

                val rdd = sc.cassandraTable("test", "kv")
                println(rdd.count)
                println(rdd.first)
                println(rdd.map(_.getInt("value")).sum)

                val sqlC = new CassandraSQLContext(sc)

                val sqlText = "SELECT * FROM test.kv"
                val df = sqlC.sql(sqlText)
                df.show()
                df.foreach(println)
        }
}

如您所见,spark 使用 sc.cassandraTable("test", "kv") 成功创建了一个 rdd,它能够获取计数、第一个值和总和。

当我运行 sql 查询时,我试图在 cqlsh 上运行 spark-sql,这是我得到的结果:

cqlsh> select * from test.kv;

 key  | value
------+-------
 key1 |     1
 key2 |     2

(2 rows)

这是 build.sbt 文件,一个包含 spark-cassandra-connector 的胖 jar 被保存在 lib 文件夹中,因此它被 sbt 作为 unmanagedDependancy 自动添加到类路径中(我不认为构建文件是考虑的问题我已经成功地创建了一个基于 C* 表的 rdd 并在其上使用了方法)

lazy val root = (project in file(".")).
        settings(
                name := "CassSparkTest",
                version := "1.0"
        )
libraryDependencies ++= Seq(
        "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5" % "provided",
        "org.apache.cassandra" % "cassandra-thrift" % "2.1.5" % "provided",
        "org.apache.cassandra" % "cassandra-clientutil" % "2.1.5" % "provided",
        //"com.datastax.spark" %% "spark-cassandra-connector" % "1.3.0-M1"  % "provided",
        "org.apache.spark" %% "spark-core" % "1.3.0" % "provided",
        "org.apache.spark" %% "spark-streaming" % "1.3.0" % "provided",
        "org.apache.spark" %% "spark-sql" % "1.3.0" % "provided"
)
4

1 回答 1

1

尝试 Spark 1.3.1

从 spark 连接器检查正确的版本 Versions.scala

于 2015-06-09T16:28:33.980 回答