如何在非事务性、轻量级的环境中保证消息传递?
例如:
- 正常情况:写入数据库,提交,向 ZeroMQ|Redis|OtherMQ 发送消息,消费者拉取消息继续处理...
- 0,05% 情况:写入数据库,提交,应用程序死掉!,没有消息发送,没有消费者拉消息,处理不完整。
在这种情况下如何不丢失消息(避免不发送消息)?
编辑:消息必须只发送一次。
如何在非事务性、轻量级的环境中保证消息传递?
例如:
在这种情况下如何不丢失消息(避免不发送消息)?
编辑:消息必须只发送一次。
在这种情况下,您有 2 个共享资源(数据库和队列)并且您希望它们一起进行事务处理。如果消息发送到队列,您希望数据库提交。如果未成功发送,您希望数据库不提交,反之亦然。这简直就是像 2PC 一样的全局事务机制。然而,实现一个全局事务机制并不是那么容易,而且成本也很高。
我建议您在生产者端实施至少一种策略,在消费者端实施幂等性以提供一致性。
您应该在生产者端的数据库上创建一个消息表,并将消息保存到该表中,然后再发送到队列。然后使用计划线程(这里可能有多个线程来增加吞吐量,但如果您的消息需要按照它们产生的顺序使用)或任何其他可以将它们发送到队列并将它们标记为已发送的东西,以确保已经发送的消息将不再发送。即使您这样做了,在某些情况下您的消息可能会被多次发送(例如,您将消息发送到队列并且您的应用程序在将消息标记为已发送之前崩溃了)。但这不是问题,因为我们已经想在生产者端实现至少一次策略,这意味着我们希望消息至少发送一次到队列中。
为了防止消费者消费在生产者端多次产生的相同消息,您应该实现幂等消费者。简单地说,您可以将已消费消息的 id 保存到消费者端的数据库表中,在处理来自队列的消息之前,您可以检查它是否已经被消费。如果它已被消耗,您应该忽略它并获取下一条消息。
当然,还有其他选项可以在微服务环境中提供一致性。您可以在这个很棒的博客上找到其他解决方案 - https://www.nginx.com/blog/event-driven-data-management-microservices/。我上面解释的解决方案也存在于这个博客中。您可以在使用本地事务发布事件部分中找到它。
这里也许是一个简单的方法。
假设您有交易:
因此,假设您的应用程序在您处于第 2 步或第 3 步时崩溃。如果是这样,您不知道最后一条消息是否确实收到了客户队列,并且您必须在没有最后一次确认的情况下重新启动所有消息后重新发送(第 3 步)。
问题出在消费者方面,因为他们可能会收到两次消息。为了解决这个问题,您可以在每条消息中发送一个始终在增加的事务 ID。消费者必须注意最后一条消息的事务 ID。当传入消息的事务 ID 不高于最后一条消息的事务 ID 时,可以忽略该消息。
现在的问题是您是否可以修改消息结构以及您可以使用哪个事务 ID。