0

我想将应用程序上下文从 Quart 传递到 Celery 应用程序。我尝试将Flask 提供的同步代码示例改编为 Quart 的异步形式。

但是,在运行此代码片段后,asyncio.run()在单独的函数中执行类似运行的操作会给出 RuntimeError,因为已经有一个正在运行的循环。

从 Quart传递app_context到 Celery 的可能方法是什么?

# imports
import asyncio
from quart import Quart
from backend.config import Config
from celery import Celery
import backend.celeryconfig as celeryconfig

# code snippet
def create_celery(app):  # app is a Quart app
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)
    celery.config_from_object(celeryconfig)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            async def helper(self, *args, **kwargs):
                async with app.app_context():
                    return await self.run(*args, **kwargs)

            asyncio.run(helper(self, *args, **kwargs))


    celery.Task = ContextTask
    return celery
4

0 回答 0