32

我想将 Celery 用作我的任务的队列,这样我的网络应用程序就可以将任务排入队列,返回响应,并且该任务将同时处理/有一天/ ...我构建了一种 API,所以我不提前知道会有什么样的任务——未来,可能会有处理 HTTP 请求的任务,另一个 IO,还有 CPU 消耗的任务。在这方面,我想在进程上运行 Celery 的工作人员,因为这些是 Python 中通用的并行性。

但是,我也想在我的任务中使用 gevent,所以我可以让一个任务产生许多 HTTP 请求等。问题是,当我这样做时:

from gevent import monkey
monkey.patch_all()

芹菜停止工作。它开始了,但没有任务可以有效地排队——它们似乎去代理,但 Celery 工作人员没有收集和处理它们。只能启动和等待。如果我删除这些行并在没有任何 gevent 和并行化的情况下执行任务,那么一切正常。

我认为这可能是因为 gevent 补丁也线程化。所以我尝试了

from gevent import monkey
monkey.patch_all(thread=False)

...但是 Celery 甚至没有启动,它在没有给出原因的情况下崩溃(打开了日志的调试级别)。

是否可以使用 Celery 将任务排入队列并使用 gevent 在单个任务中做一些事情?如何?我做错了什么?

4

4 回答 4

28

我相信启动任务的推荐方式如下。

python manage.py celery worker -P gevent --loglevel=INFO

Gevent 需要尽早打补丁。

于 2013-05-28T21:43:35.230 回答
13

您可以使用包含多个 greenlet 的多个线程运行 celery,如下所示:

$ celery multi start 4 -P gevent -l info -c:1-4 1000
于 2013-01-03T15:24:39.940 回答
2

据我所知,这是不可能的。如果有人找到更好的答案,我会接受它而不是这个我的。

唯一的选择是将 gevent 也用作 Celery 工作人员的后端。为了完成这样的事情,必须做的是在配置文件中添加以下内容:

CELERYD_POOL = 'gevent'

可以在此处找到有关此选项的更多详细信息。有关 gevent 池的更多信息在此页面上。请注意 gevent 池仍被标记为实验性的事实。我发现没有可用于比较不同任务(面向 IO 的任务、面向 CPU 的任务)上的进程和异步 gevent 池的基准,但最后我意识到,即使我的CPU 密集型任务实际上也会比 CPU 更多 IO,因为我使用数据库保存结果和数据库连接将是瓶颈,而不是计算部分。我不会有真正会影响 CPU 的科学任务。

于 2012-11-07T12:09:44.747 回答
2

根据我奇怪的经验,Celery Beat 无法与具有 gevent 池的工作人员正常工作(计划任务被阻止并永远等待),除非您为 Beat 进程激活 gevent 猴子补丁。

但是,celery beat不支持--pool=gevent-P gevent选项。注入 gevent 猴子补丁的正确方法是使用自定义celery二进制文件,例如:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from gevent import monkey
monkey.patch_all()

import re
import sys

from celery.__main__ import main

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
    sys.exit(main())

保存为celery-gevent,运行 Beat 服务如下:

celery-gevent beat --app=proj.celery:app --loader=djcelery.loaders.DjangoLoader -f /var/log/celery/beat.log -l INFO --workdir=/my/proj --pidfile=/var/run/celery/beat.pid

proj.celery您还应该修补 Django 连接以避免DatabaseError

from __future__ import absolute_import

import os
# Set the Django settings module for the 'celery' program
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

import django
# Load Django model definitions, etc
django.setup()

from django.db import connection
# Allow thread sharing to ensure that Django database connection
# works properly with gevent.
connection.allow_thread_sharing = True

from django.conf import settings
from celery import Celery

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

(上面的例子适用于 Python 2.7.10、Celery 3.1.18、Django 1.8.2 和 gevent 1.0.2)

于 2015-06-08T11:22:12.007 回答