更新:我们已经开始使用更好的方法来处理应用程序的拆卸和设置,基于最近的烧瓶文档中描述的模式。
扩展程序.py
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
celery = FlaskCelery()
db = SQLAlchemy()
应用程序.py
from flask import Flask
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.init_app(app)
return app
一旦你以这种方式设置了你的应用程序,你就可以运行和使用 celery,而无需在应用程序上下文中显式运行它,因为如果需要,你的所有任务都将自动在应用程序上下文中运行,而你没有明确担心任务后拆解,这是一个需要管理的重要问题(请参阅下面的其他回复)。
故障排除
那些不断获得的人with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app'
确保:
- 将
celery
导入保持在app.py
文件级别。避免:
应用程序.py
from flask import Flask
def create_app():
app = Flask()
initiliaze_extensions(app)
return app
def initiliaze_extensions(app):
from extensions import celery, db # DOOMED! Keep celery import at the FILE level
db.init_app(app)
celery.init_app(app)
- 在你之前开始你的芹菜工人
flask run
并使用
celery worker -A app:celery -l info -f celery.log
注意app:celery
,即从 加载app.py
。
您仍然可以从扩展导入来装饰任务,即from extensions import celery
.
下面的旧答案仍然有效,但不是一个干净的解决方案
我更喜欢通过创建一个单独的文件来在应用程序上下文中运行所有 celery,该文件使用应用程序的上下文调用 celery.start()。这意味着您的任务文件不必到处都是上下文设置和拆卸。它也很适合烧瓶“应用程序工厂”模式。
扩展程序.py
from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
任务.py
from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun
@celery.task
def do_some_stuff():
current_app.logger.info("I have the application context")
#you can now use the db object from extensions
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won't propagate across tasks)
db.session.remove()
应用程序.py
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.config_from_object(app.config)
return app
RunCelery.py
from app import create_app
from extensions import celery
app = create_app()
if __name__ == '__main__':
with app.app_context():
celery.start()