我有一个看起来像这样的 Cassandra 表
CREATE TABLE tmp.inventory (
t_id text,
is_available boolean,
modified_at bigint,
price double,
available_units bigint,
PRIMARY KEY(t_id, modified_at)
) WITH CLUSTERING ORDER BY (modified_at);
我有一个流管道来更新 Cassandra 中的项目。流式管道每隔一段时间设置检查点。因此,当管道失败时,它将重新处理自上次成功检查点以来的源数据。并且当它在失败后重新处理时,它会尝试覆盖Cassandra中已经成功写入的数据(即在最后一个成功的检查点之后但在失败之前)。我正在考虑利用modified_at
专栏来实现这一目标。就像是
UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? AND modified_at < ?
只有当 Cassandra 中的 modified_at 小于管道中的 modified_at 时,我才尝试进行更新。然而,这抛出InvalidQueryException: Slice restrictions are not supported on the clustering columns in UPDATE statements
我虽然在这种情况下 IF 条件可以提供帮助。
UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? IF modified_at < ?
但这会抛出InvalidQueryException: PRIMARY KEY column 'modified_at' cannot have IF conditions
那么处理这个问题的理想方法是什么?
编辑 如果我在这个表中只有这些字段,那么重新处理事件可能不是什么大问题,因为当管道赶上实时流但说还有另一个时,它最终会变得一致使用当前价格、可用单位等更新同一张表的流式作业。在这种情况下,如果其中一项作业失败并重新启动,则该表可能处于不一致状态。