1

如何将列族从一个 cassandra 集群复制到另一个?

设想:

  1. 我只有主机的 IP(对于源集群和目标集群)、端口、key_space 名称和 column_family 名称。
  2. 我已经在目标集群中创建了元数据(只需要复制数据)。
  3. 最优先的是,我希望使用 spark-cassandra 连接器 JAVA API 在单个/多个 spark 作业中完成此操作(中间创建 DataFrame 然后保存它)。
  4. 适度优先,使用 datastax 中的 cassandra-java 驱动程序。
  5. 最不优先,使用 cassandra-jdbc 驱动程序和 spark-cassandra 连接器 JAVA API。

任何帮助将不胜感激。在此先感谢。

4

4 回答 4

3

在现有集群上拍摄快照并在目标集群上使用批量加载器,不需要 Spark(尽管您可以这样做)。

这是有关该过程的文档,但我将提供有关您需要做什么的高级概述。

  1. 在现有集群上拍摄快照
  2. 发送(scp)快照到新集群上的一个节点
  3. 创建模式的克隆(你说你已经这样做了)
  4. 使用批量加载器将 sstables 从快照流式传输到新集群中。
于 2015-09-07T13:02:13.927 回答
3

经过大量努力,我们找到了解决方案。这个解决方案非常简单和疯狂。我们可以使用 spark 很好地做到这一点,让我们看看我们做到了。

我们在做什么(没用):

// Reading from first cassandra cluster

dataframe = cassandraSQLContext.read().format("org.apache.spark.sql.cassandra").options("otherOptionsMap").option("spark.cassandra.connection.host","firstClusterIP").load();

// Writing to second cassandra cluster

dataframe.write.mode("saveMode").options("otherOptionsMap").option("spark.cassandra.connection.host","secondClusterIP").save();

什么工作正常:

// Reading from first cassandra cluster

dataframe = cassandraSQLContext.read().format("org.apache.spark.sql.cassandra").options("otherOptionsMap").option("spark_cassandra_connection_host","firstClusterIP").load();

// Writing to second cassandra cluster

dataframe.write.mode("saveMode").options("otherOptionsMap")option("spark_cassandra_connection_host","secondClusterIP").save();

是的,没错,您只需将spark-cassandra 主机属性中的属性的period( .)更改 为 underscore( _) 。我不知道这是否是 spark-cassandra 连接器中的错误。

于 2015-10-07T09:49:10.967 回答
2

如果您使用spark-cassandra-connector,它默认支持连接多个集群。相关代码片段如下:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._

import org.apache.spark.SparkContext


def twoClusterExample ( sc: SparkContext) = {
  val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
  val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))

  val rddFromClusterOne = {
    // Sets connectorToClusterOne as default connection for everything in this code block
    implicit val c = connectorToClusterOne
    sc.cassandraTable("ks","tab")
  }

  {
    //Sets connectorToClusterTwo as the default connection for everything in this code block
    implicit val c = connectorToClusterTwo
    rddFromClusterOne.saveToCassandra("ks","tab")
  }

}

是相关文档和示例代码片段。

于 2016-02-19T10:30:54.017 回答
0

Java 示例

这将起作用

private static String sourceKeyspace = null;
private static String targetKeyspace = null;
private static String sourceHost = null;
private static String targetHost = null;
private static String sourceUsername = null;
private static String targetUsername = null;
private static String sourcePassword = null;
private static String targetPassword = null;
private static String sourceColumnFamily = null;
private static String targetColumnFamily = null;
private static String[] sourceColumns = null;
// Set all above values according to your requirements

private static JavaSparkContext sc;
SparkConf sparkConf;

sparkConf = new SparkConf(true).setAppName("Source Cassandra to Target Cassandra job");
sparkConf.setMaster(jobConfig.getString("spark.context-settings.master")); // Leave empty if you are running on local spark cluster
sparkConf
        .set("spark.cassandra.connection.host", sourceHost)
        .set("spark.cassandra.input.fetch.size_in_rows", jobConfig.getString("spark.context-settings.fetchsize"))
        .set("spark.cassandra.input.split.size_in_mb", jobConfig.getString("spark.context-settings.splitsize"))
        .set("spark.cassandra.auth.username", sourceUsername)
        .set("spark.cassandra.auth.password", sourcePassword)
        .set("cassandra.username", sourceUsername)
        .set("cassandra.password", sourcePassword)
        .set("spark.cassandra.input.consistency.level", jobConfig.getString("spark.context-settings.spark.cassandra.consistency.level"))
        .set("spark.executor.memory", jobConfig.getString("spark.context-settings.spark.executor.memory"))
        .set("spark.driver.memory",jobConfig.getString("spark.context-settings.spark.driver.memory"))
        .set("spark.executor.tasks", jobConfig.getString("spark.context-settings.spark.executor.tasks"))
        .set("spark.mesos.coarse", "true")
        .set("spark.cores.max", jobConfig.getString("spark.context-settings.spark.cores.max"))
        .set("spark.scheduler.mode", "FAIR")
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sc = new JavaSparkContext(sparkConf);

JavaRDD<Tuple2<String, Integer>> tupleRows = CassandraJavaUtil.javaFunctions(sc.sc()).
cassandraTable(sourceKeyspace, sourceColumnFamily).select(sourceColumns)
.map(row -> {
    String authorName = row.getString("author_name");
    Integer numBooks = row.getString("num_books");
    return new Tuple2<>(authorName, numBooks);
})

使用 com.datastax.spark.connector.cql.CassandraConnector 和 writerBuilder 的主要部分:

CassandraConnector targetConnection = CassandraConnector.apply(
    sparkConf.set("spark.cassandra.connection.host",targetHost)
    .set("spark.cassandra.auth.username", targetUsername)
    .set("spark.cassandra.auth.password", targetPassword)
    .set("cassandra.username", targetUsername)
    .set("cassandra.password", targetPassword)
);

CassandraJavaUtil.javaFunctions(tupleRows).writerBuilder(targetKeyspace, targetColumnFamily, mapTupleToRow(String.class, Integer.class))
.withConnector(targetConnection)
.saveToCassandra();

sc.stop();

中提琴!你完成了!

https://datastax-oss.atlassian.net/browse/SPARKC-340

于 2018-02-26T11:22:05.353 回答