1

我有一个 spark 应用程序,它从一个 cassandra 集群读取数据,并在一些计算后将数据保存到另一个 cassandra 集群。我只能在 sparkconf 中设置 1 个 cassandra 配置。但我需要再连接 1 个 cassandra 集群。

我看到一个用于连接到 cassandra 的 CassandraConnector 类,但它使用 CassandraConnectorConf 对象来创建一个对象,该对象需要很多我不知道的参数。

任何帮助都会有所帮助

4

2 回答 2

2

使用以下代码:

SparkConf confForCassandra = new SparkConf().setAppName("ConnectToCassandra")
                .setMaster("local[*]")
                .set("spark.cassandra.connection.host", "<cassandraHost>");

CassandraConnector connector = CassandraConnector.apply(confForCassandra);

javaFunctions(rdd).writerBuilder("keyspace", "table", mapToRow(Table.class)).withConnector(connector).saveToCassandra();
于 2015-08-21T06:23:05.890 回答
1

如果要使用Scala和 Spark 连接到两个 Cassandra 集群,可以使用以下代码:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._

import org.apache.spark.SparkContext


def twoClusterExample ( sc: SparkContext) = {
  val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
  val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))

  val rddFromClusterOne = {
    // Sets connectorToClusterOne as default connection for everything in this code block
    implicit val c = connectorToClusterOne
    sc.cassandraTable("ks","tab")
  }

  {
    //Sets connectorToClusterTwo as the default connection for everything in this code block
    implicit val c = connectorToClusterTwo
    rddFromClusterOne.saveToCassandra("ks","tab")
  }

} 

原始代码由 RusselSpitzer 在这里编写:https ://gist.github.com/RussellSpitzer/437f57dae4fd4bc4f32d

目前没有办法用Python和 Spark 做到这一点。

于 2015-12-30T12:31:46.340 回答