3

我想读取 RabbitMQ 队列中未确认消息的有效负载或 messageId。这可能吗?

我想这样做的原因是我试图使用 RabbitMQ 死信功能来建立一个循环来定期自动生成消息。简而言之,创建两个队列——工作队列和延迟队列。

  1. 将延迟队列中消息的TTL设置为需要周期性的时间频率。对于不同的工作目的,可以有不同的 TTL 不同的消息;
  2. 将消息放入延迟队列。当消息过期时,它会重新发布到工作队列中。只要需要,消息就可以留在工作队列中,直到消费者准备好使用它。
  3. 一位消费者拿起消息并处理它。如果处理成功,消费者需要确认工作队列,然后将消息写回延迟队列;如果处理失败(例如,线程崩溃),则没有确认。然后消息将自动重新出现在工作队列中。然后另一个消费者可以接手这项工作。当发送回延迟队列的消息再次过期时,它会被重新发布,然后被消费者重新消费……一个循环构建,工作负载分配。

我想确保循环中没有丢失或重复的消息,因为我不想错过工作或同时做双重工作。但是,发生重复消息的可能性很小。下面展示了消费者首先将消息写回延迟队列,并确认工作队列。如果线程在两行以下之间崩溃,则消息将在延迟队列中,并且 Rabbit 再次将消息重新发布到工作队列中。最终在循环中出现重复的消息。

  channel.basicPublish(DELAY_EXCHANGE, "", null, message.getBytes());
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

为了防止上述情况,我想在上面两行之后添加一个看狗逻辑:

  1. 检查循环中的消息总数(两个队列中的消息总数),看是否等于我的预期数量(我预期的数量减去 10);

  2. 如果数字不匹配,我想弄清楚哪个是丢失的,哪个是重复的,然后处理它。我不关心这些消息的顺序,或者频率受到干扰,因为这是一个非常需要考虑的边缘情况。我可以轻松地检索那些准备好的消息并将它们重新排队。但问题是如何处理那些未确认的消息?

非常感谢您!

罗伊

4

1 回答 1

2

无法从其他上下文中读取未确认的消息,原始消息已被消费并保留为未确认消息。

于 2014-11-11T09:03:49.353 回答