4

假设我在 celery 队列上的所有任务都在访问第 3 方 API。但是,API 有一个速率限制,我正在跟踪它(我需要遵守一天限制和每小时限制)。一旦我达到速率限制,我想暂停新任务的消费,然后当我知道我很好时恢复。

我通过使用以下两个任务实现了这一点:

@celery.task()
def cancel_api_queue(minutes_to_resume):
    resume_api_queue.apply_async(countdown=minutes_to_resume*60, queue='celery')
    celery.control.cancel_consumer('third_party', reply=True)

@celery.task(default_retry_delay=300, max_retries=5)
def resume_api_queue():
    celery.control.add_consumer('third_party', destination=['y@local'])

然后我可以继续提交我的 3rd 方 API 任务,一旦我的消费者被添加回来,我的所有任务都会被消耗掉。伟大的。

然而,由于我在这个队列中没有消费者,这似乎意味着我再也看不到在Flower中提交的作业(直到我的消费者被添加)。

有什么我做错了吗?我可以通过另一种方式实现这种“暂停”,让我继续看到提交的工作吗?

ps也许这与这个问题有关,但不是100%肯定:https ://github.com/celery/celery/issues/1452

如果这有所作为,我正在使用 amqp 代理。

谢谢女孩和男孩。

4

1 回答 1

2

我怀疑在工作人员接听队列消息之前查看它们的内容并不是 Flower 预期设计的一部分。因此,如果您停止使用队列中的任务,Flower 可以做的最好的事情是在“代理”窗格上显示其中有多少已作为单个数字排队。

观察传入任务内部的一种骇人听闻的方法可能是添加一个中间虚拟“转发”任务,该任务只是将消息从一个队列(让我们称之为query_inbox)转发到另一个队列(例如,query_processing)。

例如:

@celery.task(queue='query_inbox')
def query(params):
    process_query.delay(params)

@celery.task(queue='query_processing')
def process_query(params):
    ... do rate-limited stuff ...

现在您可以停止使用来自 的任务query_processing,但您仍然可以在它们流经query_inbox工作线程时观察它们的参数。

于 2018-05-23T22:49:50.250 回答