4

我正在使用 datastax 提供的 spark-cassandra-connector 1.1.0。我注意到有趣的问题,但我不确定为什么会发生这样的事情:当我广播 cassandra 连接器并尝试在执行程序上使用它时,我收到异常提示我的配置无效无法连接到 0.0.0 的 Cassandra。

示例堆栈跟踪:

java.io.IOException: Failed to open native connection to Cassandra at {0.0.0.0}:9042
        at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:174)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
        at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
        at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
        at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97)
...

但是,如果我在不广播的情况下使用它,一切正常。

对我来说也很奇怪,在驱动程序端广播值打印正确的配置,但在执行器端没有。

司机端:

  val dbConf = ssc.sparkContext.getConf
  val connector = CassandraConnector(dbConf)
  println(connector.hosts) //Set(10.20.1.5) 
  val broadcastedConnector = ssc.sparkContext.broadcast(connector)
  println(broadcastedConnector.value.hosts) //Set(10.20.1.5) 

执行方:

mapPartition{
...
 println(broadcastedConnector.hosts) // Set(0.0.0.)
...
}

有人可以解释为什么它以这种方式工作,以及如何以一种可以在执行者方面使用的方式广播 Cassandra 连接器。

更新同样的问题出现在 1.2.3 版本的连接器中。

4

1 回答 1

3

没有理由广播 Cassandra 连接器。在并行化闭包中使用它只会序列化配置并在执行器上创建新连接,或者使用现有的执行器连接(如果存在)。

于 2015-07-18T20:42:07.287 回答