0

我有一个使用 mongoDB 作为持久性和 RabbitMQ 作为消息代理的系统。我有一个挑战,我只想为 RabbitMQ 发布失败场景实现事务发件箱。我不确定这是否可能,因为我也有使用相同 mongoDB 持久性的消费者,所以当我编写涵盖 RabbitMQ 发布失败场景的事务发件箱的代码时,发布的消息在 mongoDB commitTransaction 之前到达消费者,所以我的消费者不能由于延迟,无法在 mongoDB 中找到消息。

我的代码如下所示;

1-开始会话事务

2-插入带有会话的文档(因此在我调用提交之前它不会持续存在)

3-发布rabbitMQ

4- 如果成功 commitTransaction

5-如果错误插入到带有会话的发件箱文档中而不是 commitTransaction

6-如果 mongoDB abortTransaction 出现问题(如果发布成功且 mongoDB 失败,我的消费者首先检查 mongoDB 是否存在,如果不存在则不执行任何操作。)

所以问题在于消息在 mongoDB 持久性之前到达消费者,你有什么解决方案可以解决我的问题吗?

4

2 回答 2

0

据我所知,https ://microservices.io/patterns/data/transactional-outbox.html 中图片中概述的架构直接映射到 MongoDB 更改流:

  • 将交易保持在 1 左右
  • 在事务中插入到发件箱表中
  • 设置消息中继过程,该过程请求发件箱表上的更改流,并为每个插入的文档发布消息到消息代理

可以重试向消息代理的发布,也可以重试读取的更改流,以防出现任何错误。您需要正确跟踪恢复令牌,请参阅例如https://docs.mongodb.com/ruby-driver/master/reference/change-streams/#resuming-a-change-stream

这种方法的局限性:

  • 只有一个消息中继过程,没有可扩展性和冗余 - 如果它死了,你将不会收到通知,直到它回来

您提出的解决方案有一组不同的问题,例如通过在提交之前发布通知,您可以接受通知处理器无法找到它从消息代理中获取的文档的可能性,如您所说。

于 2021-08-30T16:41:20.677 回答
0

所以我想分享我的解决方案。

不幸的是,不可能仅针对失败场景实施事务发件箱模式。

我的决定是,围绕高可用性创建一个架构;

MongoDB 作为高可用持久性,RabbitMQ 作为高可用消息代理。

我删除了我之前编码的所有会话事务,并实现了立即写入和发布。

在最坏的情况下:

1-插入文档(成功)

2-rabbitmq 发布(失败)

3-插入发件箱(失败)

我将拥有的是,我的 mongo 中未发布的文档。即使在最坏的情况下,我也可以使用另一个应用程序从 MongoDB 重新发布消息,但在遇到这种情况之前我不会编写该应用程序,因为我们无法涵盖代码中的所有失败场景。所以我们的消息代理或持久性必须是高可用的。

于 2021-09-02T09:46:23.543 回答