我有以下典型场景:
- 用于购买产品的订购服务。充当分布式事务的指挥官。
- 包含产品列表及其库存的产品服务。
一种支付服务。
Orders DB Products DB | | --------------- ---------------- ---------------- | OrderService | | ProductService | | PaymentService | --------------- ---------------- ---------------- | | | | -------------------- | --------------- | Kafka orders topic |------------- ---------------------
正常的流程是:
- 用户订购产品。
- 订单服务在数据库中创建订单并在 Kafka 主题“订单”中发布消息以预订产品(PRODUCT_RESERVE_REQUEST)。
- 产品服务在其数据库中减少一个单位的产品库存,并在“订单”中发布一条消息说 PRODUCT_RESERVED
- 订单服务获取 PRODUCT_RESERVED 消息并命令支付发布消息 PAYMENT_REQUESTED
- 支付服务订购付款并回复一条消息 PAYED
- 订单服务读取 PAYED 消息并将订单标记为 COMPLETED,完成交易。
我在处理错误情况时遇到了麻烦,例如:让我们假设:
- 支付服务未能为产品收费,因此它发布消息 PAYMENT_FAILED
- 订单服务响应发布消息 UNDO_PRODUCT_RESERVATION
- 产品服务增加数据库中的库存以取消预订并发布 PRODUCT_UNRESERVATION_COMPLETED
- 订单服务完成交易,将订单的最终状态保存为 CANCELLED_PAYMENT_FAILED。
在这种情况下,假设无论出于何种原因,订单服务发布了 UNDO_PRODUCT_RESERVATION 消息但没有收到 PRODUCT_UNRESERVATION_COMPLETED 消息,因此它重新尝试发布另一个 UNDO_PRODUCT_RESERVATION 消息。
现在,假设同一订单的这两个 UNDO_PRODUCT_RESERVATION 消息最终到达 ProductService。如果我同时处理它们,我最终可能会为产品设置无效库存。
在这种情况下如何实现幂等性?
更新:
按照 Artem 的说明,我现在可以检测到重复的消息(通过检查消息头)并忽略它们,但可能仍然存在以下情况,我不应该忽略重复的消息:
- 订单服务发送 UNDO_PRODUCT_RESERVATION
- 产品服务收到消息并开始处理它,但在更新库存之前崩溃。
- 订单服务未收到响应,因此它重试发送 UNDO_PRODUCT_RESERVATION
- 产品服务知道这是一条重复的消息,但在这种情况下,它应该再次重复处理。
你能帮我想出一种方法来支持这种情况吗?我如何区分何时应该丢弃消息或重新处理它?