在某些特殊情况下,我需要以某种方式在接收点告诉消费者某些消息不应该被处理。否则两个系统将变得不同步(我们处理一些过时的外部系统,例如,如果连接断开,我们必须丢弃该连接范围内的所有排队操作)。
冒险并手动解决问题消息?补偿行动(在我的情况下可能很难支持)?还要别的吗?
在某些特殊情况下,我需要以某种方式在接收点告诉消费者某些消息不应该被处理。否则两个系统将变得不同步(我们处理一些过时的外部系统,例如,如果连接断开,我们必须丢弃该连接范围内的所有排队操作)。
冒险并手动解决问题消息?补偿行动(在我的情况下可能很难支持)?还要别的吗?
有几种方法:
您可以在发送消息时设置生存时间:await endpoint.Send(myMessage, c => c.TimeToLive = TimeSpan.FromHours(1));
,但这将适用于像这样发送(或发布)的所有消息。在查看您的要求后,我会考虑这一点。这是技术性的,但它是一种正确的消息传递模式。
为您的消息本身设置 TTL 和生成时间戳属性,并让消费者决定该消息是否仍然值得处理。这是更多的业务,并且可能是最正确的方法。
结合技术和业务 - 将时间戳和 TTL 保留在消息头中,这样它们就不会污染您的消息合约,并使用自定义中间件将它们过滤掉。在这种情况下,您需要小心记录此类丢弃,这样您就不会想知道为什么消息会不时消失。
几乎任何不可靠的集成都可以使用 sagas 进行监控,并带有超时。例如,我们使用 saga 与 Twilio 集成。由于我们无法为它们打开 webhook,因此我们会在一段时间后轮询以检查消息状态。您可以在收到消息时启动 saga 并安排消息以检查处理是否仍在等待。正如评论中所讨论的,您可以使用“需要人工干预”的方式来解决问题,或者让 saga 决定放弃该消息。
类似的方法可能是使用查找表,在其中放置与处理无关的消息列表。这样的表将类似于 sagas 列表。似乎这种方式也需要调度。在这里和传奇中,我建议对DropIt
消息使用单独的接收端点(队列),只有一个消费者。它将防止DropIt
消息卡在等待处理的集成消息后面(有些应该已经被丢弃)
使用 RMQ 管理 API 从队列中删除消息。这是最糟糕的方法,我不会推荐它。
在某些特殊情况下,我需要以某种方式在接收点告诉消费者某些消息不应该被处理。
您不能将其还原为:
告诉消费者可以处理较早的消息。
通过这种方式,您可以轻松地将其转换为作用于两条消息的状态机(如 saga)。如果第二条消息从未到达,那么您可以在一段时间后丢弃第一条消息或执行其他操作。
这里的策略是暂停/等待,直到确定不需要恢复任何操作。
据我了解,您正在构建一个向第三方系统发送消息的系统。换句话说,您无法控制的系统。它有一个 API,但补偿动作并不总是可能的,因为 API 不提供它,或者因为动作是在无法补偿或回滚的 3rd 方系统内执行的?
如果可能,请尝试通过 sagas 解决此问题。确保 saga 以正确的顺序执行不同的步骤(发送消息)。以便最后发送无法补偿的消息。这样,如果它们失败了可以补偿的消息,将由 saga 补偿。当您尽可能确定它们不必得到补偿时,应该最后发送无法补偿的那些。因为最后一条消息是同步所有系统的最后一步。
总而言之,这是分布式系统的问题之一,要保持一切同步。补偿行动是解决这个问题的方法。如果无法采取补偿措施,那么您将处于非常困难的境地。尝试通过变得更加灵活并接受您需要补偿的事情来查看业务是否可以提供帮助,他们会告诉您这是不可能的。