我有一个 spark 应用程序,它从一个 cassandra 集群读取数据,并在一些计算后将数据保存到另一个 cassandra 集群。我只能在 sparkconf 中设置 1 个 cassandra 配置。但我需要再连接 1 个 cassandra 集群。
我看到一个用于连接到 cassandra 的 CassandraConnector 类,但它使用 CassandraConnectorConf 对象来创建一个对象,该对象需要很多我不知道的参数。
任何帮助都会有所帮助
我有一个 spark 应用程序,它从一个 cassandra 集群读取数据,并在一些计算后将数据保存到另一个 cassandra 集群。我只能在 sparkconf 中设置 1 个 cassandra 配置。但我需要再连接 1 个 cassandra 集群。
我看到一个用于连接到 cassandra 的 CassandraConnector 类,但它使用 CassandraConnectorConf 对象来创建一个对象,该对象需要很多我不知道的参数。
任何帮助都会有所帮助
使用以下代码:
SparkConf confForCassandra = new SparkConf().setAppName("ConnectToCassandra")
.setMaster("local[*]")
.set("spark.cassandra.connection.host", "<cassandraHost>");
CassandraConnector connector = CassandraConnector.apply(confForCassandra);
javaFunctions(rdd).writerBuilder("keyspace", "table", mapToRow(Table.class)).withConnector(connector).saveToCassandra();
如果要使用Scala和 Spark 连接到两个 Cassandra 集群,可以使用以下代码:
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
def twoClusterExample ( sc: SparkContext) = {
val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))
val rddFromClusterOne = {
// Sets connectorToClusterOne as default connection for everything in this code block
implicit val c = connectorToClusterOne
sc.cassandraTable("ks","tab")
}
{
//Sets connectorToClusterTwo as the default connection for everything in this code block
implicit val c = connectorToClusterTwo
rddFromClusterOne.saveToCassandra("ks","tab")
}
}
原始代码由 RusselSpitzer 在这里编写:https ://gist.github.com/RussellSpitzer/437f57dae4fd4bc4f32d
目前没有办法用Python和 Spark 做到这一点。