2

我正在尝试通过使用 find 和 watch 组合来实现一个使自身保持最新的结构。例如,对于用户集合,我们找到所有用户并将它们存储在哈希图中。之后,我们打开观察流并根据传入的事件更新我们的 hashmap。但是有一个问题——如果在读取用户和打开流之间发生变化,我们最终可能会失去同步。

我的想法是使用 startAtOperationTime 选项,但我们无法可靠地知道我们需要的时间戳。我也看不到交易的方法。

问题是 - 我们如何在读取操作后准确地打开更改流并且不丢失任何数据。

4

1 回答 1

1

好问题

恕我直言,如果不知道这些 delta 应该应用于什么初始状态,那么发出 delta 更改的手表是非常没用的,因此您当然也需要进行查找。AFAIK mongo 没有原子的“查找和观察”命令。

即使从问题中删除增量并仅使用该fullDocument=updateLookup选项,仍然存在同步问题。考虑这种方法:

  1. 集合.watch()
  2. changeStream.pause()
  3. 等待 collection.find().toArray()
  4. changeStream.resume()

我们需要先设置手表,这样我们才能捕捉到在寻找时发生的任何事情。乍一看,这看起来不错。如果在您进行查找时发生了某些事情,并且流中有一些更改事件,这些将在您恢复时得到处理。由于您在查找之前启动了监视,因此更糟糕的情况是更改流包含在查找之前发生的一些更改,并且可能导致保存旧状态。这通常无关紧要,因为更改流也将包含最近的更新并最终保存正确的状态。

但是在发现之前,手表真的在 mongo 集群上运行吗?

查找查询是否甚至转到了集群中正确同步的节点?

由于统一拓扑和连接池的概念,我认为我们不可能知道这些问题的答案。我们不知道上述几行命令最终会使用哪个连接。如果 mongo 处于离线状态,客户端将缓冲诸如 watch 和 find 之类的命令,然后在池连接可用时释放它们。但是由于有多个连接在起作用(可能会在错误的时间阻塞),所以不能保证第 1 行中的 watch 命令会在第 3 行中的 find 命令之前到达 mongo。所以我很确定这个答案将在 99% 的时间内工作,但它不是 100% 可靠的。

您可以尝试通过检查resumeTokenChanged事件来确认 changeStream 已连接,然后进行查找,但如果您的集群遇到一些节点 oplog 需要一段时间才能同步的问题,您仍然可以“查找()”旧数据并且看得太晚了。这意味着错过了更新。

更新

我认为我们希望的最佳解决方案是结合上述所有内容,并根据您所需的集群同步宽限期添加延迟。

  1. collection.watch()
  2. resumeTokenChanged通过检查事件等到 changeStream 是新鲜的。这也意味着我们是有联系的。
  3. 如果我们没有及时收到resumeTokenChanged,请设置超时以考虑 changeStream 陈旧。这意味着我们的数据也可能是陈旧的。
  4. 等待集群节点同步的宽限期
  5. changeStream.pause()
  6. 如果我们仍然没有收到change来自 changeStream 的任何事件,那么,await collection.find().toArray()
  7. changeStream.resume()
  8. 如果 changeStream 发出error事件,则重新启动整个过程。
于 2021-05-21T00:46:06.183 回答