18

我正在使用 python Flask 建立一个网站。一切都很顺利,现在我正在尝试实施 celery。

在我尝试使用 celery 的烧瓶邮件发送电子邮件之前,情况也很好。现在我收到“在应用程序上下文之外工作”错误。

完整的追溯是

  Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 228, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/celery/task/trace.py", line 415, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/ryan/www/CG-Website/src/util/mail.py", line 28, in send_forgot_email
    msg = Message("Recover your Crusade Gaming Account")
  File "/usr/lib/python2.7/site-packages/flask_mail.py", line 178, in __init__
    sender = current_app.config.get("DEFAULT_MAIL_SENDER")
  File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 336, in __getattr__
    return getattr(self._get_current_object(), name)
  File "/usr/lib/python2.7/site-packages/werkzeug/local.py", line 295, in _get_current_object
    return self.__local()
  File "/usr/lib/python2.7/site-packages/flask/globals.py", line 26, in _find_app
    raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context

这是我的邮件功能:

@celery.task
def send_forgot_email(email, ref):
    global mail
    msg = Message("Recover your Crusade Gaming Account")
    msg.recipients = [email]
    msg.sender = "Crusade Gaming stuff@cg.com"
    msg.html = \
        """
        Hello Person,<br/>

        You have requested your password be reset. <a href="{0}" >Click here recover your account</a> or copy and paste this link in to your browser: {0} <br />

        If you did not request that your password be reset, please ignore this.
        """.format(url_for('account.forgot', ref=ref, _external=True))
    mail.send(msg)

这是我的芹菜文件:

from __future__ import absolute_import

from celery import Celery

celery = Celery('src.tasks',
                broker='amqp://',
                include=['src.util.mail'])


if __name__ == "__main__":
    celery.start()
4

5 回答 5

16

这是一个与烧瓶应用程序工厂模式一起使用的解决方案,还可以创建带有上下文的 celery 任务,而无需使用app.app_context(). 在避免循环导入的同时获取该应用程序确实很棘手,但这解决了它。这适用于撰写本文时最新的 celery 4.2。

结构:

repo_name/
    manage.py
    base/
    base/__init__.py
    base/app.py
    base/runcelery.py
    base/celeryconfig.py
    base/utility/celery_util.py
    base/tasks/workers.py

base本示例中的主应用程序包也是如此。在base/__init__.py我们创建 celery 实例如下:

from celery import Celery
celery = Celery('base', config_source='base.celeryconfig')

base/app.py文件包含烧瓶应用程序工厂create_app,并注意init_celery(app, celery)它包含:

from base import celery
from base.utility.celery_util import init_celery

def create_app(config_obj):
    """An application factory, as explained here:
    http://flask.pocoo.org/docs/patterns/appfactories/.
    :param config_object: The configuration object to use.
    """
    app = Flask('base')
    app.config.from_object(config_obj)
    init_celery(app, celery=celery)
    register_extensions(app)
    register_blueprints(app)
    register_errorhandlers(app)
    register_app_context_processors(app)
    return app

继续base/runcelery.py内容:

from flask.helpers import get_debug_flag
from base.settings import DevConfig, ProdConfig
from base import celery
from base.app import create_app
from base.utility.celery_util import init_celery
CONFIG = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONFIG)
init_celery(app, celery)

接下来,base/celeryconfig.py文件(例如):

# -*- coding: utf-8 -*-
"""
Configure Celery. See the configuration guide at ->
http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
"""

## Broker settings.
broker_url = 'pyamqp://guest:guest@localhost:5672//'
broker_heartbeat=0

# List of modules to import when the Celery worker starts.
imports = ('base.tasks.workers',)

## Using the database to store task state and results.
result_backend = 'rpc'
#result_persistent = False

accept_content = ['json', 'application/text']

result_serializer = 'json'
timezone = "UTC"

# define periodic tasks / cron here
# beat_schedule = {
#    'add-every-10-seconds': {
#        'task': 'workers.add_together',
#        'schedule': 10.0,
#        'args': (16, 16)
#    },
# }

现在在文件中定义 init_celery base/utility/celery_util.py

# -*- coding: utf-8 -*-

def init_celery(app, celery):
    """Add flask app context to celery.Task"""
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask

对于工人base/tasks/workers.py

from base import celery as celery_app
from flask_security.utils import config_value, send_mail
from base.bp.users.models.user_models import User
from base.extensions import mail # this is the flask-mail

@celery_app.task
def send_async_email(msg):
    """Background task to send an email with Flask-mail."""
    #with app.app_context():
    mail.send(msg)

@celery_app.task
def send_welcome_email(email, user_id, confirmation_link):
    """Background task to send a welcome email with flask-security's mail.
    You don't need to use with app.app_context() here. Task has context.
    """
    user = User.query.filter_by(id=user_id).first()
    print(f'sending user {user} a welcome email')
    send_mail(config_value('EMAIL_SUBJECT_REGISTER'),
              email,
              'welcome', user=user,
              confirmation_link=confirmation_link) 

然后,您需要从文件夹repo_name的两个不同的 cmd 提示符中启动 celery beat 和 celery worker 。

在一个 cmd 提示符celery -A base.runcelery:celery beat下执行 a 和另一个celery -A base.runcelery:celery worker.

然后,运行需要烧瓶上下文的任务。应该管用。

于 2018-06-03T10:58:14.170 回答
6

Flask-mail 需要 Flask 应用程序上下文才能正常工作。在 celery 端实例化 app 对象并像这样使用 app.app_context:

with app.app_context():
    celery.start()
于 2013-04-25T18:10:59.830 回答
3

我没有任何观点,所以我无法支持@codegeek 的上述答案,所以我决定自己写,因为我搜索这样的问题得到了这个问题/答案的帮助:我刚刚取得了一些成功在 python/flask/celery 场景中解决类似的问题。即使您的错误是由于尝试使用mail而我的错误是尝试url_for在 celery 任务中使用,但我怀疑这两者与同一个问题有关,并且url_for如果您尝试使用那之前mail

由于 celery 任务中没有应用程序的上下文(即使在包含 之后import app from my_app_module),我也遇到了错误。您需要mail在应用程序的上下文中执行操作:

from module_containing_my_app_and_mail import app, mail    # Flask app, Flask mail
from flask.ext.mail import Message    # Message class

@celery.task
def send_forgot_email(email, ref):
    with app.app_context():    # This is the important bit!
        msg = Message("Recover your Crusade Gaming Account")
        msg.recipients = [email]
        msg.sender = "Crusade Gaming stuff@cg.com"
        msg.html = \
        """
        Hello Person,<br/>
        You have requested your password be reset. <a href="{0}" >Click here recover your account</a> or copy and paste this link in to your browser: {0} <br />
        If you did not request that your password be reset, please ignore this.
        """.format(url_for('account.forgot', ref=ref, _external=True))

        mail.send(msg)

如果有人感兴趣,url_for可以在这里找到我在 celery 任务中使用问题的解决方案

于 2014-12-25T16:30:53.287 回答
2

在您的 mail.py 文件中,导入您的“app”和“mail”对象。然后,使用请求上下文。做这样的事情:

from whateverpackagename import app
from whateverpackagename import mail

@celery.task
def send_forgot_email(email, ref):
    with app.test_request_context():
        msg = Message("Recover your Crusade Gaming Account")
        msg.recipients = [email]
        msg.sender = "Crusade Gaming stuff@cg.com"
        msg.html = \
        """
        Hello Person,<br/>
        You have requested your password be reset. <a href="{0}" >Click here recover your account</a> or copy and paste this link in to your browser: {0} <br />
        If you did not request that your password be reset, please ignore this.
        """.format(url_for('account.forgot', ref=ref, _external=True))

        mail.send(msg)
于 2013-04-25T19:06:44.810 回答
0

无需使用app.app_context(),只需在注册蓝图之前配置 celery,如下所示:

celery = Celery('myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

从您希望使用 celery 的蓝图中,调用已创建的 celery 实例来创建您的 celery 任务。

它将按预期工作。

于 2019-10-24T10:23:51.433 回答