16

我在 RabbitMQ 上有一个类似于作业队列的东西,并且在请求取消作业时,我想撤回尚未开始处理的任务(它们的消息尚未被确认),这对应于撤回这些消息来自它们被路由到的队列。

我还没有在 AMQP 或 RabbitMQ API 中找到这个功能;也许我搜索得不够好?还是我必须使用解决方法(这并不难,但仍然如此)?

4

4 回答 4

11

我会通过让工作人员检查某种权威数据源来确定工作是否应该继续来解决这种情况。例如,工作人员会检查数据库中的作业状态,以查看作业是否已被取消。

对于处理作业的速度可能快于可以更新和读取权威存储的速度的场景,以速度换取其他特性的较少保证的数据存储可能是有用的。

这方面的一个例子是使用 Redis 作为取消处理消息的存储,而不是像 MySQL 这样的关系数据库。Redis 非常快,但对其所拥有的数据提供的保证较少,而 MySQL 则慢得多,但对其所拥有的数据提供更多的保证。

最后,与另一个来源检查是否处理消息的概念是相同的,但是您实现的方式取决于您的特定场景。

于 2012-02-14T15:30:15.367 回答
5

RabbitMQ 不允许您在消息入队后修改或删除消息。为此,您需要某种数据库来保存每个作业的状态,并使用 RabbitMQ 通知相关方该状态的更改。

对于低容量,您可以将它与每个作业的队列组合在一起。创建队列,将工作描述发布到队列,向工作人员宣布队列的名称。如果作业需要在处理之前取消,删除作业的队列;当工人来获取工作描述时,他们会注意到队列已经消失了。

更轻量级且通常更好的是使用 redis 或其他键/值存储来保存作业状态(删除或不存在的记录意味着取消或不存在的作业)并使用 rabbitmq 通知键中的新/删除/更改记录/值存储。

于 2011-06-19T00:55:41.287 回答
3

至少有两种方法可以实现您的目标:

于 2010-09-28T11:11:17.403 回答
1

您需要订阅所有已路由消息的队列,并使用 ack 消费它们。

例如,如果您使用“test”作为路由键发布到主题交换,并且有 3 个订阅“test”的持久队列,您将需要使用这三个队列。最好添加另一个队列,您的消费者进程也会监听,并告诉他们忽略这些消息。

由于您使用的是 RabbitMQ,因此另一种方法是编写一个自定义交换插件,该插件将接受一些带外指令以清除所有队列。例如,您可能让该交换读取一个特殊的消息头,告诉它清除该消息的目的地的所有队列。这确实需要编写 Erlang 代码,但是实现了 4 种不同的交换类型,因此您只需要复制最相似的一种并为新的行为编写代码。如果您为此仅使用自定义标头,则消息的正文可以是消费者的正常消息。

总结一下:

1) 发布者需要自己消费消息 2) 发布者可以在特殊队列中发送一条特殊消息,告诉消费者忽略该消息 3) 发布者可以向自定义交换器发送一条特殊消息,该交换器将清除其中的任何现有消息在将此特殊消息发送给消费者之前的队列。

于 2011-07-18T05:16:16.903 回答