47

我正在使用 Celery 独立(不在 Django 中)。我计划在多台物理机器上运行一种工作任务类型。该任务执行以下操作

  1. 接受 XML 文档。
  2. 改造它。
  3. 进行多个数据库读取和写入。

我正在使用 PostgreSQL,但这同样适用于使用连接的其他商店类型。过去,我使用数据库连接池来避免在每个请求上创建新的数据库连接或避免连接打开时间过长。但是,由于每个 Celery 工作人员都在单独的进程中运行,我不确定他们实际上如何能够共享池。我错过了什么吗?我知道 Celery 允许您保留从 Celery 工作人员返回的结果,但这不是我在这里想要做的。根据处理的数据,每个任务可以执行几个不同的更新或插入。

从 Celery 工作人员内部访问数据库的正确方法是什么?

是否可以在多个工作人员/任务之间共享一个池,或者是否有其他方法可以做到这一点?

4

6 回答 6

37

我喜欢tigeronk2 的每个工人一个连接的想法。正如他所说,Celery 维护自己的工作池,因此实际上不需要单独的数据库连接池。Celery Signal 文档解释了如何在创建 worker 时进行自定义初始化,因此我将以下代码添加到我的 tasks.py 中,它似乎完全按照您的预期工作。当工作人员关闭时,我什至能够关闭连接:

from celery.signals import worker_process_init, worker_process_shutdown

db_conn = None

@worker_process_init.connect
def init_worker(**kwargs):
    global db_conn
    print('Initializing database connection for worker.')
    db_conn = db.connect(DB_CONNECT_STRING)


@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
    global db_conn
    if db_conn:
        print('Closing database connectionn for worker.')
        db_conn.close()
于 2014-11-10T22:11:06.407 回答
3

每个工作进程有一个数据库连接。由于 celery 本身维护一个工作进程池,因此您的数据库连接将始终等于 celery 工作人员的数量。另一方面,它会将数据库连接池绑定到 celery 工作进程管理。但这应该没问题,因为 GIL 在一个进程中一次只允许一个线程。

于 2013-01-27T17:05:16.290 回答
2

您可以覆盖默认行为以在 celery 配置中拥有线程工作人员而不是每个进程的工作人员:

CELERYD_POOL = "celery.concurrency.threads.TaskPool"

然后,您可以将共享池实例存储在您的任务实例上,并从每个线程任务调用中引用它。

于 2013-01-25T17:01:27.163 回答
1

也许,celery.concurrency.gevent可以提供池共享并且不会加剧 GIL。但是,它的支持仍然是“实验性的”。

还有一个psycopg2.pool.SimpleConnectionPool在greenlets(协程)之间共享,它们都将在单个进程/线程中运行。

关于该主题的少量其他堆栈讨论。

于 2013-01-25T23:05:13.203 回答
0

也许你可以使用pgbouncer。对于 celery,什么都不应该改变,并且连接池是在进程之外完成的。我有同样的问题

(“也许”是因为我不确定是否会有任何副作用)

于 2014-09-02T00:53:18.020 回答
0

通过实施和监控来回馈我的发现。

欢迎反馈。

参考:使用池http://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html

每个工作进程(由 -ck 指定的 prefork 模式)将建立一个与 DB 的新连接,而无需池化或重用。因此,如果使用池化,则只能在每个工作进程级别看到该池。所以 pool size > 1 没有用,但重用连接仍然可以保存打开和关闭的连接。

如果每个工作进程使用一个连接,则在初始化阶段为每个工作进程(prefork 模式 celery -A app worker -ck)建立 1 个 DB 连接。它可以重复打开和关闭连接。

无论有多少个工作线程(eventlet),每个工作线程(celery -A app worker -P eventlet)只建立一个与DB的连接,无需池化或重用。因此对于 eventlet,一个 celery 进程(celery -A app worker ...)上的所有工作线程(eventlets)在每一时刻都有 1 个 db 连接。

根据芹菜文档

但是您需要确保您的任务不执行阻塞调用,因为这将停止工作程序中的所有其他操作,直到阻塞调用返回。

这可能是由于 MYSQL DB 连接的方式是阻塞调用。

于 2016-07-26T05:56:10.727 回答