1

我有一个看起来像这样的 Cassandra 表

CREATE TABLE tmp.inventory (
    t_id text,
    is_available boolean,
    modified_at bigint,
    price double,
    available_units bigint,
    PRIMARY KEY(t_id, modified_at)
) WITH CLUSTERING ORDER BY (modified_at);

我有一个流管道来更新 Cassandra 中的项目。流式管道每隔一段时间设置检查点。因此,当管道失败时,它将重新处理自上次成功检查点以来的源数据。并且当它在失败后重新处理时,它会尝试覆盖Cassandra中已经成功写入的数据(即在最后一个成功的检查点之后但在失败之前)。我正在考虑利用modified_at专栏来实现这一目标。就像是

UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? AND modified_at < ?

只有当 Cassandra 中的 modified_at 小于管道中的 modified_at 时,我才尝试进行更新。然而,这抛出InvalidQueryException: Slice restrictions are not supported on the clustering columns in UPDATE statements

我虽然在这种情况下 IF 条件可以提供帮助。

UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? IF modified_at < ?

但这会抛出InvalidQueryException: PRIMARY KEY column 'modified_at' cannot have IF conditions

那么处理这个问题的理想方法是什么?

编辑 如果我在这个表中只有这些字段,那么重新处理事件可能不是什么大问题,因为当管道赶上实时流但说还有另一个时,它最终会变得一致使用当前价格、可用单位等更新同一张表的流式作业。在这种情况下,如果其中一项作业失败并重新启动,则该表可能处于不一致状态。

4

1 回答 1

0

为了避免一个线程可以在另一个线程已经插入新数据之后写入旧数据的情况,您可以USING TIMESTAMP在执行 INSERT 或 UPDATE 时使用(在 Cassandra 中,无论如何都是 UPSERT,因此从语法角度来看,使用 INSERT 可能更容易,恕我直言)。这个想法是您明确指定记录的时间戳,因此当另一个线程比前一个线程插入较旧的数据时,数据将被插入,但他们不会获胜,因为 Cassandra 使用时间戳(明确指定)来检测最新版本。像这样的东西:

INSERT INTO tmp.inventory (t_id, is_available, modified_at)
  VALUES (?, ?,?)
  USING TIMESTAMP <modified_at*1000>

唯一要记住的是,在 中指定的值USING TIMESTAMP使用微秒而不是毫秒,并且您需要计算的值<modified_at*1000>- 您不能在那里使用表达式(这里只是举例)。

于 2021-04-25T07:14:39.547 回答