0

我想标题很混乱,但我找不到更好的。

我在 MongoDB 中有一个事件流,有多个生产者和一个消费者。为了确保我以正确的顺序准确读取每个事件一次,我使用 MongoDB 时间戳类型作为递增值,由服务器填充。在 SQL 世界中,我可能会使用自动递增的整数。

我的消费者只是轮询 MongoDB 并询问自它看到的最后一个时间戳以来的所有事件。在其中一种环境中,我们意识到有时消费者不会处理所有事件。它不会经常发生,就像错过了 50.000 个事件之一,但理想情况下它根本不应该发生。

我的假设是 MongoDB 在内部做了类似的事情。

ParseDocument(doc);
lock 
{
   SetTimestamp(doc);
}
WriteDocument(doc);
UpdateIndex(doc);

因此,当消费者查询事件时,可能会在很短的时间内无法使用文档,因为只有事件 #1、#2 和 #4 被写入,而事件 #3 在几分之一毫秒后写入.

我已经在 Docker 中使用 C# 客户端和 MongoDB 4.2 看到了这一点,但我想客户端在这里无关紧要。

这个假设是否正确,如果是,我该怎么办?

我的想法是改变我的消费者以询问自最后一个时间戳减去几秒钟以来的所有事件,然后过滤掉消费者中已经收到的事件。

但是有更优雅的解决方案吗?也许某种方式来强制收集级别的写锁或事务可以帮助吗?

4

1 回答 1

1

既然你说“消费者”——单数,我建议:

  • 使用更改流来获得事件通知。更改流,如果正确迭代,将不会跳过更改,也不会两次返回相同的更改。
  • 每当文档从更改流返回时,当它被单个消费者处理时,就向它添加一个计数器。由于只有一个消费者,因此在没有竞争条件等情况下实现计数器相对容易。
  • 还将当前的恢复令牌写入每个正在处理的事件中。
  • 如果您愿意,您可以使用计数器来唯一标识事件。
  • 要再次迭代事件,请使用计数器查找过去的事件。鉴于每个事件都有一个计数器和一个恢复令牌,一旦您到达最近的事件,您就可以无缝地从基于计数器的迭代过渡到基于恢复令牌的迭代。
于 2020-08-26T14:32:33.847 回答