Java 示例
这将起作用
private static String sourceKeyspace = null;
private static String targetKeyspace = null;
private static String sourceHost = null;
private static String targetHost = null;
private static String sourceUsername = null;
private static String targetUsername = null;
private static String sourcePassword = null;
private static String targetPassword = null;
private static String sourceColumnFamily = null;
private static String targetColumnFamily = null;
private static String[] sourceColumns = null;
// Set all above values according to your requirements
private static JavaSparkContext sc;
SparkConf sparkConf;
sparkConf = new SparkConf(true).setAppName("Source Cassandra to Target Cassandra job");
sparkConf.setMaster(jobConfig.getString("spark.context-settings.master")); // Leave empty if you are running on local spark cluster
sparkConf
.set("spark.cassandra.connection.host", sourceHost)
.set("spark.cassandra.input.fetch.size_in_rows", jobConfig.getString("spark.context-settings.fetchsize"))
.set("spark.cassandra.input.split.size_in_mb", jobConfig.getString("spark.context-settings.splitsize"))
.set("spark.cassandra.auth.username", sourceUsername)
.set("spark.cassandra.auth.password", sourcePassword)
.set("cassandra.username", sourceUsername)
.set("cassandra.password", sourcePassword)
.set("spark.cassandra.input.consistency.level", jobConfig.getString("spark.context-settings.spark.cassandra.consistency.level"))
.set("spark.executor.memory", jobConfig.getString("spark.context-settings.spark.executor.memory"))
.set("spark.driver.memory",jobConfig.getString("spark.context-settings.spark.driver.memory"))
.set("spark.executor.tasks", jobConfig.getString("spark.context-settings.spark.executor.tasks"))
.set("spark.mesos.coarse", "true")
.set("spark.cores.max", jobConfig.getString("spark.context-settings.spark.cores.max"))
.set("spark.scheduler.mode", "FAIR")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sc = new JavaSparkContext(sparkConf);
JavaRDD<Tuple2<String, Integer>> tupleRows = CassandraJavaUtil.javaFunctions(sc.sc()).
cassandraTable(sourceKeyspace, sourceColumnFamily).select(sourceColumns)
.map(row -> {
String authorName = row.getString("author_name");
Integer numBooks = row.getString("num_books");
return new Tuple2<>(authorName, numBooks);
})
使用 com.datastax.spark.connector.cql.CassandraConnector 和 writerBuilder 的主要部分:
CassandraConnector targetConnection = CassandraConnector.apply(
sparkConf.set("spark.cassandra.connection.host",targetHost)
.set("spark.cassandra.auth.username", targetUsername)
.set("spark.cassandra.auth.password", targetPassword)
.set("cassandra.username", targetUsername)
.set("cassandra.password", targetPassword)
);
CassandraJavaUtil.javaFunctions(tupleRows).writerBuilder(targetKeyspace, targetColumnFamily, mapTupleToRow(String.class, Integer.class))
.withConnector(targetConnection)
.saveToCassandra();
sc.stop();
中提琴!你完成了!
https://datastax-oss.atlassian.net/browse/SPARKC-340