以我是 Celery 新手的事实作为我的问题的前言,并且 (1) 可能已在其他地方得到回答(如果是这样,我找不到答案)或 (2) 可能有更好的方法来实现我的目标比我直接问的。
另外,我知道celery.contrib.methods
,但task_method
并没有完全完成我正在寻找的东西。
我的目标
我想创建一个类 mixin,将整个类变成 Celery 任务。例如,由下面的代码表示的 mixin(现在不运行):
from celery import Task
class ClassTaskMixin(Task):
@classmethod
def enqueue(cls, *args, **kwargs):
cls.delay(*args, **kwargs)
def run(self, *args, **kwargs):
Obj = type(self.name, (), {})
Obj(*args, **kwargs).run_now()
def run_now(self):
raise NotImplementedError()
与使用时不同task_method
,我不想.delay()
在任务排队并被调用之前完全实例化类。相反,我想简单地将类名连同任何相关的初始化参数传递给异步进程。然后异步进程将使用类名和给定的初始化参数完全实例化类,然后.run_now()
在实例化对象上调用一些方法(例如)。
示例用例
异步构建和发送电子邮件将是我需要的 mixin 的一个示例。
class WelcomeEmail(EmailBase, ClassTaskMixin):
def __init__(self, recipient_address, template_name, template_context):
self.recipient_address = recipient_address
self.template_name = template_name
self.template_context = template_context
def send(self):
self.render_templates()
self.construct_mime()
self.archive_to_db()
self.send_smtp_email()
def run_now(self):
self.send()
上面的代码将通过调用在异步 Celery 进程中发送电子邮件WelcomeEmail.enqueue(recipient_address, template_name, template_context)
。在进程中同步发送电子邮件将通过调用来完成WelcomeEmail(recipient_address, template_name, template_context).send()
。
问题
- 在 Celery 框架中,我正在尝试做的事情是否非常非常错误?
- 有没有更好的方法来构建 mixin 以使其比我提出的更 Celery-onic(更好的属性名称、不同的方法结构等)?
- 正如我所描述的那样,在用例中使 mixin 起作用时,我缺少什么?