我正在尝试通过使用 find 和 watch 组合来实现一个使自身保持最新的结构。例如,对于用户集合,我们找到所有用户并将它们存储在哈希图中。之后,我们打开观察流并根据传入的事件更新我们的 hashmap。但是有一个问题——如果在读取用户和打开流之间发生变化,我们最终可能会失去同步。
我的想法是使用 startAtOperationTime 选项,但我们无法可靠地知道我们需要的时间戳。我也看不到交易的方法。
问题是 - 我们如何在读取操作后准确地打开更改流并且不丢失任何数据。
我正在尝试通过使用 find 和 watch 组合来实现一个使自身保持最新的结构。例如,对于用户集合,我们找到所有用户并将它们存储在哈希图中。之后,我们打开观察流并根据传入的事件更新我们的 hashmap。但是有一个问题——如果在读取用户和打开流之间发生变化,我们最终可能会失去同步。
我的想法是使用 startAtOperationTime 选项,但我们无法可靠地知道我们需要的时间戳。我也看不到交易的方法。
问题是 - 我们如何在读取操作后准确地打开更改流并且不丢失任何数据。
好问题
恕我直言,如果不知道这些 delta 应该应用于什么初始状态,那么发出 delta 更改的手表是非常没用的,因此您当然也需要进行查找。AFAIK mongo 没有原子的“查找和观察”命令。
即使从问题中删除增量并仅使用该fullDocument=updateLookup
选项,仍然存在同步问题。考虑这种方法:
我们需要先设置手表,这样我们才能捕捉到在寻找时发生的任何事情。乍一看,这看起来不错。如果在您进行查找时发生了某些事情,并且流中有一些更改事件,这些将在您恢复时得到处理。由于您在查找之前启动了监视,因此更糟糕的情况是更改流包含在查找之前发生的一些更改,并且可能导致保存旧状态。这通常无关紧要,因为更改流也将包含最近的更新并最终保存正确的状态。
但是在发现之前,手表真的在 mongo 集群上运行吗?
查找查询是否甚至转到了集群中正确同步的节点?
由于统一拓扑和连接池的概念,我认为我们不可能知道这些问题的答案。我们不知道上述几行命令最终会使用哪个连接。如果 mongo 处于离线状态,客户端将缓冲诸如 watch 和 find 之类的命令,然后在池连接可用时释放它们。但是由于有多个连接在起作用(可能会在错误的时间阻塞),所以不能保证第 1 行中的 watch 命令会在第 3 行中的 find 命令之前到达 mongo。所以我很确定这个答案将在 99% 的时间内工作,但它不是 100% 可靠的。
您可以尝试通过检查resumeTokenChanged
事件来确认 changeStream 已连接,然后进行查找,但如果您的集群遇到一些节点 oplog 需要一段时间才能同步的问题,您仍然可以“查找()”旧数据并且看得太晚了。这意味着错过了更新。
更新
我认为我们希望的最佳解决方案是结合上述所有内容,并根据您所需的集群同步宽限期添加延迟。
collection.watch()
resumeTokenChanged
通过检查事件等到 changeStream 是新鲜的。这也意味着我们是有联系的。resumeTokenChanged
,请设置超时以考虑 changeStream 陈旧。这意味着我们的数据也可能是陈旧的。changeStream.pause()
change
来自 changeStream 的任何事件,那么,await collection.find().toArray()
changeStream.resume()
error
事件,则重新启动整个过程。