5

我是 Spark 和 Cassandra 的新手。在尝试提交 Spark 作业时,我在连接到 Cassandra 时遇到错误。

细节:

版本:

Spark : 1.3.1 (build for hadoop 2.6 or later : spark-1.3.1-bin-hadoop2.6)
Cassandra : 2.0
Spark-Cassandra-Connector: 1.3.0-M1
scala : 2.10.5

Spark 和 Cassandra 在一个虚拟集群上 集群详情:

Spark Master : 192.168.101.13
Spark Slaves : 192.168.101.11 and 192.168.101.12
Cassandra Nodes: 192.168.101.11 (seed node) and 192.168.101.12

我正在尝试通过我的客户端计算机(笔记本电脑)- 172.16.0.6 提交工作。在谷歌搜索这个错误之后,我确保我可以从客户端机器上 ping 集群上的所有机器:spark master/slaves 和 cassandra 节点,并且还禁用了所有机器上的防火墙。但我仍在努力解决这个错误。

卡桑德拉.yaml

listen_address: 192.168.101.11 (192.168.101.12 on other cassandra node)
start_native_transport: true
native_transport_port: 9042
start_rpc: true
rpc_address: 192.168.101.11 (192.168.101.12 on other cassandra node)
rpc_port: 9160

我正在尝试运行一个最小的示例作业

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector._

val rdd = sc.cassandraTable("test", "words")
rdd.toArray.foreach(println)

要提交作业,我使用 spark-shell(:将代码粘贴到 spark shell 中):

    spark-shell --jars "/home/ameya/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.3.0-M1/spark-cassandra-connector_2.10-1.3.0-M1.jar","/home/ameya/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.5/cassandra-driver-core-2.1.5.jar","/home/ameya/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar","/home/ameya/.m2/repository/io/netty/netty/3.8.0.Final/netty-3.8.0.Final.jar","/home/ameya/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar","/home/ameya/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar","/home/ameya/.m2/repository/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar","/home/ameya/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar","/home/ameya/.m2/repository/io/netty/netty/3.8.0.Final/netty-3.8.0.Final.jar","/home/ameya/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar","/home/ameya/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.5/cassandra-clientutil-2.1.5.jar","/home/ameya/.m2/repository/joda-time/joda-time/2.3/joda-time-2.3.jar","/home/ameya/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.3/cassandra-thrift-2.1.3.jar","/home/ameya/.m2/repository/org/joda/joda-convert/1.2/joda-convert-1.2.jar","/home/ameya/.m2/repository/org/apache/thrift/libthrift/0.9.2/libthrift-0.9.2.jar","/home/ameya/.m2/repository/org/apache/thrift/libthrift/0.9.2/libthrift-0.9.2.jar" --master spark://192.168.101.13:7077 --conf spark.cassandra.connection.host=192.168.101.11 --conf spark.cassandra.auth.username=cassandra --conf spark.cassandra.auth.password=cassandra

我得到的错误:

warning: there were 1 deprecation warning(s); re-run with -deprecation for details
**java.io.IOException: Failed to open native connection to Cassandra at {192.168.101.11}:9042**
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:181)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:167)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:167)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:76)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:104)
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:115)
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:243)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:49)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:148)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:118)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
    at org.apache.spark.rdd.RDD.toArray(RDD.scala:833)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
    at $iwC$$iwC$$iwC.<init>(<console>:54)
    at $iwC$$iwC.<init>(<console>:56)
    at $iwC.<init>(<console>:58)
    at <init>(<console>:60)
    at .<init>(<console>:64)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
**Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.101.11:9042 (com.datastax.driver.core.TransportException: [/192.168.101.11:9042] Connection has been closed))**
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1236)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:333)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:174)
    ... 71 more

谁能指出我在这里做错了什么?

4

5 回答 5

7

默认情况下,您没有指定spark.cassandra.connection.host spark 假定 cassandra 主机与 spark 主节点相同。

var sc:SparkContext=_
val conf = new SparkConf().setAppName("Cassandra Demo").setMaster(master)
.set("spark.cassandra.connection.host", "192.168.101.11")
c=new SparkContext(conf)

val rdd = sc.cassandraTable("test", "words")
rdd.toArray.foreach(println)

如果您在cassandra.yaml中正确设置了种子节点,它应该可以工作

于 2015-06-23T06:40:48.797 回答
6

我在一夜之间为这个问题苦苦挣扎,终于得到了一个有效的组合。我正在为那些可能遇到类似问题的人写下来。

首先,这是一个版本问题 cassandra-driver-core 的依赖。但是要找出有效的确切组合需要我相当多的时间。

其次,这是适合我的组合。

  1. Spark 1.6.2 与 Hadoop 2.6、cassandra 2.1.5(Ubuntu 14.04、Java 1.8)、
  2. 在built.sbt(sbt 程序集,scalaVersion := "2.10.5")中,使用

"com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0", "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5"

第三,让我澄清一下我的不满。使用 spark-cassandra-connector 1.5.0,我可以在具有远程 cassandra 连接的同一台机器上使用带有 --master "local[2]" 的 spark-submit 运行程序集,没有任何问题。连接器 1.5.0、1.6.0 与 Cassandra 2.0、2.1、2.2、3,4 的任何组合都可以正常工作。但是,如果我尝试使用 --master yarn --deploy-mode 集群将作业从同一台机器(NodeManager)提交到集群,那么我总是会遇到问题:无法在 {192.168. 122.12}:9042

这里发生了什么?DataStarX的任何人都可以看看这个问题吗?我只能猜测它与“cqlversion”有关,它应该与Cassandra集群的版本匹配。

有人知道更好的解决方案吗?[卡桑德拉],[阿帕奇火花]

于 2016-07-25T19:32:46.300 回答
4

它终于奏效了:

脚步 :

  1. 将 listen_address 设置为 EC2 实例的私有 IP。
  2. 不要设置任何广播地址
  3. 将 rpc_address 设置为 0.0.0.0
  4. 将 broadcast_rpc_address 设置为 EC2 实例的公共 IP。
于 2016-10-22T06:57:01.967 回答
2

问题解决了。这是由于依赖关系有些混乱。我构建了一个带有依赖项的 jar 并将其传递给 spark-submit,而不是单独指定依赖 jar。

于 2015-06-26T17:33:15.700 回答
0

这是 cassandra-driver-core jar 依赖项的版本问题。

The provided cassandra's version is 2.0
The provided cassandra-driver-core jar's version is 2.1.5

jar 应该与运行的 cassandra 的版本相同。

In this case, the included jar file should be cassandra-driver-core-2.0.0.jar
于 2016-06-20T11:03:50.690 回答