49

我最近切换到 Celery 3.0。在此之前,我使用Flask-Celery将 Celery 与 Flask 集成。虽然它有很多问题,比如隐藏一些强大的 Celery 功能,但它允许我使用 Flask 应用程序的完整上下文,尤其是 Flask-SQLAlchemy。

在我的后台任务中,我正在处理数据和 SQLAlchemy ORM 来存储数据。Flask-Celery 的维护者已经放弃了对该插件的支持。该插件在任务中腌制 Flask 实例,因此我可以完全访问 SQLAlchemy。

我试图在我的 tasks.py 文件中复制这种行为,但没有成功。您对如何实现这一目标有任何提示吗?

4

4 回答 4

78

更新:我们已经开始使用更好的方法来处理应用程序的拆卸和设置,基于最近的烧瓶文档中描述的模式。

扩展程序.py

import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

class FlaskCelery(Celery):

    def __init__(self, *args, **kwargs):

        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            abstract = True

            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)


celery = FlaskCelery()
db = SQLAlchemy()

应用程序.py

from flask import Flask
from extensions import celery, db

def create_app():
    app = Flask()
    
    #configure/initialize all your extensions
    db.init_app(app)
    celery.init_app(app)

    return app

一旦你以这种方式设置了你的应用程序,你就可以运行和使用 celery,而无需在应用程序上下文中显式运行它,因为如果需要,你的所有任务都将自动在应用程序上下文中运行,而你没有明确担心任务后拆解,这是一个需要管理的重要问题(请参阅下面的其他回复)。

故障排除

那些不断获得的人with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app'确保:

  1. celery导入保持在app.py文件级别。避免:

应用程序.py

from flask import Flask

def create_app():
    app = Flask()

    initiliaze_extensions(app)

    return app

def initiliaze_extensions(app):
    from extensions import celery, db # DOOMED! Keep celery import at the FILE level
    
    db.init_app(app)
    celery.init_app(app)
  1. 在你之前开始你的芹菜工人flask run并使用
celery worker -A app:celery -l info -f celery.log

注意app:celery,即从 加载app.py

您仍然可以从扩展导入来装饰任务,即from extensions import celery.

下面的旧答案仍然有效,但不是一个干净的解决方案

我更喜欢通过创建一个单独的文件来在应用程序上下文中运行所有 celery,该文件使用应用程序的上下文调用 celery.start()。这意味着您的任务文件不必到处都是上下文设置和拆卸。它也很适合烧瓶“应用程序工厂”模式。

扩展程序.py

from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

任务.py

from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun

@celery.task
def do_some_stuff():
    current_app.logger.info("I have the application context")
    #you can now use the db object from extensions

@task_postrun.connect
def close_session(*args, **kwargs):
    # Flask SQLAlchemy will automatically create new sessions for you from 
    # a scoped session factory, given that we are maintaining the same app
    # context, this ensures tasks have a fresh session (e.g. session errors 
    # won't propagate across tasks)
    db.session.remove()

应用程序.py

from extensions import celery, db

def create_app():
    app = Flask()
    
    #configure/initialize all your extensions
    db.init_app(app)
    celery.config_from_object(app.config)

    return app

RunCelery.py

from app import create_app
from extensions import celery

app = create_app()

if __name__ == '__main__':
    with app.app_context():
        celery.start()
于 2013-01-03T20:07:57.170 回答
6

我使用Paul Gibbs 的答案有两个不同之处。我使用了worker_process_init 而不是task_postrun。而不是 .remove(),我使用了 db.session.expire_all()。

我不是 100% 确定,但据我了解,它的工作方式是当 Celery 创建一个工作进程时,所有继承/共享的数据库会话都将过期,并且 SQLAlchemy 将根据需要创建该工作进程独有的新会话。

到目前为止,它似乎已经解决了我的问题。使用 Paul 的解决方案,当一名工作人员完成并删除会话时,使用同一会话的另一名工作人员仍在运行其查询,因此 db.session.remove() 在使用时关闭了连接,给我一个“与 MySQL 的连接丢失查询期间的服务器”异常。

感谢保罗引导我朝着正确的方向前进!

没关系,那没有用。如果 Celery 调用它,我最终在我的 Flask 应用程序工厂中有一个参数不运行 db.init_app(app) 。相反,在 Celery 分叉后,工人们会调用它。我现在在我的 MySQL 进程列表中看到了几个连接。

from extensions import db
from celery.signals import worker_process_init
from flask import current_app

@worker_process_init.connect
def celery_worker_init_db(**_):
    db.init_app(current_app)
于 2014-06-22T07:55:04.347 回答
5

在您的 tasks.py 文件中执行以下操作:

from main import create_app
app = create_app()

celery = Celery(__name__)
celery.add_defaults(lambda: app.config)

@celery.task
def create_facet(project_id, **kwargs):
    with app.test_request_context():
       # your code
于 2012-08-21T12:42:08.947 回答
1
from flask import Flask
from werkzeug.utils import import_string
from celery.signals import worker_process_init, celeryd_init
from flask_celery import Celery
from src.app import config_from_env, create_app

celery = Celery()

def get_celery_conf():
    config = import_string('src.settings')
    config = {k: getattr(config, k) for k in dir(config) if k.isupper()}
    config['BROKER_URL'] = config['CELERY_BROKER_URL']
    return config

@celeryd_init.connect
def init_celeryd(conf=None, **kwargs):
    conf.update(get_celery_conf())

@worker_process_init.connect
def init_celery_flask_app(**kwargs):
    app = create_app()
    app.app_context().push()
  • 在 celeryd init 更新 celery 配置
  • 使用您的烧瓶应用程序工厂来初始化所有烧瓶扩展,包括 SQLAlchemy 扩展。

通过这样做,我们能够维护每个工作人员的数据库连接。

如果你想在烧瓶上下文下运行你的任务,你可以子类化Task.__call__

class SmartTask(Task):

    abstract = True

    def __call__(self, *_args, **_kwargs):
        with self.app.flask_app.app_context():
            with self.app.flask_app.test_request_context():
                result = super(SmartTask, self).__call__(*_args, **_kwargs)
            return result

class SmartCelery(Celery):

    def init_app(self, app):
        super(SmartCelery, self).init_app(app)
        self.Task = SmartTask
于 2016-06-06T04:40:11.557 回答