3

我有一个并发 4 的工作人员。我看到 4 个进程开始在花中,一切看起来都很好。

如果我在 shell 中执行此操作,那么我会看到 4 个工作人员接受任务,其余的被保留,它一次处理 4 个,直到队列为空。

[my_task.apply_async() for i in xrange(10)]

但是,如果我逐行执行此操作,则只有前两个任务会被积极处理,然后从那时起它一次只处理两个。

my_task.apply_async()
my_task.apply_async()
my_task.apply_async()
my_task.apply_async()
...

有任何想法吗?

4

1 回答 1

3

通常这是因为子进程填满了并发槽。Celery 默认使用 prefork 作为执行池,每次你生成一个任务的子进程(另一个 fork)时,它都算作一个正在运行的进程来填充并发槽。

避免这种情况的最简单方法是使用 eventlet,这将允许您在每个任务上产生多个异步调用。但是,这要求您的任何任务都没有阻塞调用,例如subprocess.communicate,因为它们将阻塞所有任务。

CELERYD_CONCURRENCY否则,如果您有必要的阻塞调用,并且您知道您的任务一次只有一个正在运行的子进程,您可以8将立即开始(例如使用@app.task(rate_limit='10/m'))。但是,这有点 hack,使用 eventlet 肯定是首选。

于 2014-08-08T13:48:24.267 回答