1

我的问题的关键是确定如何在兔子生产者被杀后进行清理。

我有一个有多个生产者和多个消费者(工人)的 rabbitmq 环境。目前,所有消息(又名工作)都输入一个中央队列,消费者一次选择一条消息(工作任务)并进行处理。任何给定的生产者都会生成约 800 条工作消息,大约需要 3-4 小时才能完成。

工人是快乐的人,他们不知道任务,可以处理生产者提出的任何事情。他们以拉动方式工作,因为他们的工作可能需要长达 10 分钟才能完成每项任务。一旦他们准备好进行更多工作,他们就会轮询队列。

生产者更善变——有时他们会在队列中创建大量工作作为消息,然后决定他们根本不需要完成这些工作。更糟糕的是,它们有时会产生大量工作;取消所有这些;然后用不同的参数重新发布作品。

我没有要求制作人重新排队工作的机制。部分问题是他们在将请求推送到队列之前必须完成其他工作。

不能不想把工作留在队列中,因为生产者的单次运行代表 3 或 4 小时的工作。而我最糟糕的情况是,我有几个生产者回溯了他们要求的工作,而我还有几个其他生产者希望将他们的工作保留在队列中。

所以在最坏的情况下,我的队列看起来像这样,我需要清除 Prod B 消息:

... | ~800 Prod A messages | ~800 Prod B messages | ~800 Prod C messages | ...

而且我不能保证来自 A、B 和 C 的消息不会交错,因为我目前没有同步写入队列。

我考虑过的:

  • 一个简单的队列清除将取出我需要保留的消息。
  • 我可以编写一个专门的消费者来提取所有消息并将好的消息重新排队,但这似乎是一种黑客行为。

作为一个额外的挑战,我可能需要支持制作人的工作优先于其他人的工作。

综上所述,在生产者将工作放入队列然后决定取消工作请求后,我该如何清理?

4

1 回答 1

2

我会创建一个辅助队列,它是管理的,你偶尔会在这个队列上放置消息以发出命令,例如PurgeQueue: C,或PrioritizeQueue: A,或PauseUntilISaySo最终跟随OkYouMayPass。然后,您的所有工作人员只需在他们实际处理的其他人之前检查该队列/主题/任何内容(或者如果您只有一个活动的可调度侦听器,请让附加到您的管理队列的那个有一些信号可以发送以指示工作人员暂停,以便它可以处理管理操作,直到完成),并且在极少数情况下,它有一条消息,然后他们将处理该消息,而不是其他队列的工作。

如果您遇到想要删除消息的情况,我会鼓励您ignoreList在工作人员中维护一个,管理消息可用于更新它,因此一旦生产者决定“等等,不要做工作在“它发送一条管理消息说“添加到您的忽略队列:msgId1,msgId2,msgId3]”,工作人员将拉取它,将其添加到他们的忽略队列中,他们对收到的每条工作消息所做的第一件事就是简单:检查他们的忽略队列,看看他们是否应该处理该消息

于 2014-10-23T15:41:09.953 回答