1

从 Cassandra 表加载数据时,spark 分区表示具有相同分区键的所有行。但是,当我使用相同的分区键在 spark 中创建数据并使用 .repartitionByCassandraReplica(..) 方法对新的 RDD 重新分区时,它最终会出现在不同的 spark 分区中吗?如何使用 Spark-Cassandra 连接器定义的分区方案在 Spark 中实现一致的分区?

下载我测试的 CQL 和 Spark 作业代码的链接

版本和其他信息

  • 火花:1.3
  • 卡桑德拉:2.1
  • 连接器:1.3.1
  • Spark 节点 (5) 和 Cass* 集群节点 (4) 在不同的数据中心运行

代码提取。使用上面的链接下载代码以获取更多详细信息

第 1 步:将数据加载到 8 个 spark 分区中

Map<String, String> map = new HashMap<String, String>();
CassandraTableScanJavaRDD<TestTable> tableRdd = javaFunctions(conf)
 .cassandraTable("testkeyspace", "testtable", mapRowTo(TestTable.class, map));

第 2 步:将数据重新分区为 8 个分区

.repartitionByCassandraReplica(
        "testkeyspace",
        "testtable",
        partitionNumPerHost,
        someColumns("id"),
        mapToRow(TestTable.class, map));

第 3 步:打印两个 rdds 的分区 id 和值

rdd.mapPartitionsWithIndex(...{
@Override
 public Iterator<String> call(..) throws Exception {
 List<String> list = new ArrayList<String>();
 list.add("PartitionId-" + integer);

 while (itr.hasNext()) {
    TestTable value = itr.next();
    list.add(Integer.toString(value.getId()));
 }
 return list.iterator();
}
}, true).collect();

第 4 步:在分区 1 上打印的结果快照。两个 Rdd 不同,但期望相同

加载 Rdd 值

----------------------------
Table load - PartitionId -1
----------------------------
15
22

--------------------------------------
Repartitioned values - PartitionId -1
--------------------------------------
33
16
4

1 回答 1

2

Cassandra 副本的重新分区不会确定性地放置密钥。目前有一张票可以改变这一点。

https://datastax-oss.atlassian.net/projects/SPARKC/issues/SPARKC-278

现在的解决方法是将 Partitionspernode 参数设置为 1。

于 2015-11-24T19:11:24.690 回答