2

我对 Cassandra (2.1.11) 和 Spark (1.4.1) 都很陌生,并且有兴趣知道是否有人已经看到/开发了使用 Spark Streaming 对两个不同 Cassandra 表进行原子写入的解决方案。

我目前有两个表,它们保存相同的数据集,但具有不同的分区键。为简单起见,我将使用熟悉的 User 表示例进行说明:

CREATE TABLE schema1.user_by_user_id
(
    user_id uuid
    ,email_address text
    ,num int //a value that is frequently updated
    ,PRIMARY KEY (user_id)
);

CREATE TABLE schema1.user_by_email_address
(
    email_address text
    ,user_id uuid
    ,num int //a value that is frequently updated
    ,PRIMARY KEY (email_address)
);

email_address列将具有高基数(实际上它将在user_id值数量的 50% 到 100% 之间)。高基数使二级索引表现不佳,因此需要第二张表。

我正在使用 Spark Streaming 处理num列中的更改并更新这两个表。据我了解,该saveToCassandra()方法在 UNLOGGED BATCH 中为 RDD 中的每个项目执行写入,从而执行原子写入(如此处的“保存对象集合”部分所述。但是,saveToCassandra()只能用于保存到单个表。为了使 theschema1.user_by_user_idschema1.user_by_email_address表保持同步,我必须发出两个单独的saveToCassandra()调用:

rdd.saveToCassandra("schema1","user_by_user_id",SomeColumns("user_id","email"address","num"))
rdd.saveToCassandra("schema1","user_by_email_address",SomeColumns("user_id","email"address","num"))

每次调用中发生的写入都以原子方式完成,但两个调用一起不是原子的。第二次调用中的一些错误将使两个表不同步。

显然我的数据集和实际表结构比这更复杂,但我试图以尽可能简单的方式传达我的问题的要点。虽然我的问题是针对能够保存到两个表的,但我欢迎任何有关数据模型更改的替代建议,这将完全消除这种需求。

4

1 回答 1

1

首先要了解:UNLOGGED批次不是原子的。请参阅文档UNLOGGED批处理给您的唯一好处是能够使用相同的时间戳进行多次写入。

因此,如果您想进行多次调用saveToCassandra并让它们表现得好像是一次调用,只需为这两个调用指定 WRITETIME。一切完成后,所有修改后的数据都将具有相同的时间戳。

至于您如何对多个表进行原子更新的问题……您不能。卡桑德拉不支持它。

我能想到的最好的建议是创建自己的批处理日志,您可以在崩溃后查阅该批处理日志以确定需要重新同步的内容。

想象一下这样的事情:

CREATE TABLE batch_log
(
    id uuid,
    updated_users set<uuid>,
    PRIMARY KEY(id)
)

开始你的工作时,生成一个新的 uuid,它将作为这个工作的 id。然后,您将发出 3 次保存:

rdd.saveToCassandra("schema1", "batch_log", SomeColumns("batch_id", "user_id" append)
rdd.saveToCassandra("schema1","user_by_user_id",SomeColumns("user_id","email"address","num"))
rdd.saveToCassandra("schema1","user_by_email_address",SomeColumns("user_id","email"address","num"))

如果您的批处理完成没有任何崩溃,您可以删除batch_log创建的行。但是,如果系统在中途崩溃,那么一旦事情恢复在线,您可以查阅batch_log以获取更新的用户列表。去查询这些用户的电子邮件地址,然后更新user_by_email_address表格。完成此修复后,您可以删除您的batch_log.

实际上,您正在“手动”实施 Cassandra LOGGED BATCH。

于 2015-12-01T20:44:14.073 回答