10

基本上我的消费者也是生产者。我们得到一个初始数据集并将其发送到队列。消费者拿走一件物品并处理它,从那时起有 3 种可能性:

  1. 数据很好,并为存储设置了一个“好”队列
  2. 数据不好并被丢弃
  3. 数据不好(还)或坏(还),因此数据被分解成更小的部分并发送回队列以进行进一步处理。

我的问题在于第 3 步,因为队列起初增长得非常快,因此有可能将一段数据分解为在队列中重复的部分,并且消费者继续处理它并最终陷入无限循环。

我认为防止这种情况的方法是防止重复进入队列。我不能在客户端执行此操作,因为在一个小时的过程中,我可能有许多核心处理数十亿个数据点(让每个客户端在提交之前对其进行扫描会使我的速度太慢)。我认为这需要在服务器端完成,但是就像我提到的,数据非常大,我不知道如何有效地确保没有重复。

我可能会问不可能的问题,但我想我会试一试。任何想法将不胜感激。

4

3 回答 3

11

我认为即使您可以解决不将重复项发送到队列的问题,您迟早会遇到这个问题:

来自 RabbitMQ 文档:“从故障中恢复:如果由于客户端连接到的节点发生故障而导致客户端与代理断开连接,如果客户端是发布客户端,则代理可能已接受并在客户端没有收到确认的情况下传递来自客户端的消息;同样在消费端,客户端可能已对消息发出确认,但不知道这些确认是否已发送到代理并在之前处理过失败发生了。简而言之,您仍然需要确保您的消费客户端能够识别和处理重复消息。

基本上,它看起来像这样,您向 rabbitmq 发送请求,rabbitmq 回复 ACK 但出于某种原因,您的消费者或生产者没有收到此 ACK。Rabbitmq 无法知道没有收到 ack,并且您的生产者最终将重新发送消息,从未收到 ack。

处理重复消息很痛苦,尤其是在将消息用作一种 RPC 的应用程序中,但在使用这种消息架构时,这似乎是不可避免的。

于 2012-10-30T21:49:14.120 回答
3

核心问题似乎是这样的:

"...its possible that a piece of data is broken down into a part that's 
duplicated in the queue and the consumers continue to process it and 
end up in a infinite loop."

您可以随心所欲地关注排队物品的独特性,但上述问题是您应该集中精力的地方,IMO。防止无限循环的一种方法可能是在消息有效负载中设置一个“已访问”位,该位由消费者在重新排队分解的项目之前设置。

另一种选择是让消费者重新排队回到一个特殊的队列,该队列的处理方式略有不同,以防止无限循环。无论哪种方式,您都应该通过将其作为应用程序策略的核心部分来解决问题,而不是使用消息传递系统的功能来绕过它。

于 2012-04-14T17:32:26.397 回答
2

rabbitmq 有一个插件,可以让你用一些额外的标题来做这种类型的控制。

您应该启用插件并x-deduplication-header在消息上定义,使用哈希或唯一标识发送的消息的东西,因此当具有相同标头值的其他消息进入rabbitmq的交换时,它不会被路由到任何队列。

请参阅:https ://github.com/noxdafox/rabbitmq-message-deduplication

于 2020-04-20T04:14:02.477 回答