1

以我是 Celery 新手的事实作为我的问题的前言,并且 (1) 可能已在其他地方得到回答(如果是这样,我找不到答案)或 (2) 可能有更好的方法来实现我的目标比我直接问的。

另外,我知道celery.contrib.methods,但task_method并没有完全完成我正在寻找的东西。

我的目标

我想创建一个类 mixin,将整个类变成 Celery 任务。例如,由下面的代码表示的 mixin(现在不运行):

from celery import Task

class ClassTaskMixin(Task):

    @classmethod
    def enqueue(cls, *args, **kwargs):
        cls.delay(*args, **kwargs)

    def run(self, *args, **kwargs):
        Obj = type(self.name, (), {})
        Obj(*args, **kwargs).run_now()

    def run_now(self):
        raise NotImplementedError()

与使用时不同task_method,我不想.delay()在任务排队并被调用之前完全实例化类。相反,我想简单地将类名连同任何相关的初始化参数传递给异步进程。然后异步进程将使用类名和给定的初始化参数完全实例化类,然后.run_now()在实例化对象上调用一些方法(例如)。

示例用例

异步构建和发送电子邮件将是我需要的 mixin 的一个示例。

class WelcomeEmail(EmailBase, ClassTaskMixin):

    def __init__(self, recipient_address, template_name, template_context):
        self.recipient_address = recipient_address
        self.template_name = template_name
        self.template_context = template_context

   def send(self):
       self.render_templates()
       self.construct_mime()
       self.archive_to_db()
       self.send_smtp_email()

   def run_now(self):
       self.send()

上面的代码将通过调用在异步 Celery 进程中发送电子邮件WelcomeEmail.enqueue(recipient_address, template_name, template_context)。在进程中同步发送电子邮件将通过调用来完成WelcomeEmail(recipient_address, template_name, template_context).send()

问题

  1. 在 Celery 框架中,我正在尝试做的事情是否非常非常错误?
  2. 有没有更好的方法来构建 mixin 以使其比我提出的更 Celery-onic(更好的属性名称、不同的方法结构等)?
  3. 正如我所描述的那样,在用例中使 mixin 起作用时,我缺少什么?
4

1 回答 1

2

显然这个问题对很多人来说并不是很有趣,但是……我已经完成了我打算做的事情。

有关详细信息,请参阅拉取请求https://github.com/celery/celery/pull/1897

于 2014-03-07T12:11:58.603 回答