3

我知道|它是一个按位“或”运算符,但这让我想知道它在 celery 的情况下如何工作,同时链接多个任务。

(first_task.s(url) | second_tasks.s()).apply_async()

我知道第二个任务会将第一个函数的结果作为参数,但这怎么可能?'|' 在哪里 在 dj-celery 源代码中重载?

@task
def second_task(results):
   do_something(results)

有人可以提供一些见解吗?

4

2 回答 2

2

如上所述,Celery 覆盖了__or__操作符,具体如下:

def __or__(self, other):
    if isinstance(other, group):
        other = maybe_unroll_group(other)
    if not isinstance(self, chain) and isinstance(other, chain):
        return chain((self, ) + other.tasks, app=self._app)
    elif isinstance(other, chain):
        return chain(*self.tasks + other.tasks, app=self._app)
    elif isinstance(other, Signature):
        if isinstance(self, chain):
            return chain(*self.tasks + (other, ), app=self._app)
        return chain(self, other, app=self._app)
    return NotImplemented

完整的实现在这里:https ://github.com/celery/celery/blob/master/celery/canvas.py#L324

于 2016-07-14T02:46:48.757 回答
1

他们可能使用运算符重载__or__(self, other)http ://www.rafekettler.com/magicmethods.html

我不知道Celery的实现细节,只是给你一个想法:

class Task(object):
    def __init__(self, name):
        self.name = name
        self.chain = [self]

    def __or__(self, other):
        self.chain.append(other)
        return self

    def __repr__(self):
        return self.name

    def apply_async(self):
        for c in self.chain:
            print "applying", c


(Task('A') | Task('B') | Task('C')).apply_async()

输出:

applying A
applying B
applying C
于 2016-07-14T00:23:37.160 回答