13

我的情况与此处概述的情况类似,除了我不想用多个参数链接任务,而是想链接返回具有多个条目的字典的任务。

这是 - 非常松散和抽象地 - 我正在尝试做的事情:

任务.py

@task()
def task1(item1=None, item2=None):
  item3 = #do some stuff with item1 and item2 to yield item3
  return_object = dict(item1=item1, item2=item2, item3=item3)
  return return_object

def task2(item1=None, item2=None, item3=None):
  item4 = #do something with item1, item2, item3 to yield item4
  return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4)
  return return_object

从 ipython 工作,我能够单独和异步调用 task1,没有问题。

我也可以单独调用 task2,并将 task1 返回的结果作为双星参数:

>>res1 = task1.s(item1=something, item2=something_else).apply_async()
>>res1.status
'SUCCESS'
>>res2 = task2.s(**res1.result).apply_async()
>>res2.status
'SUCCESS

但是,我最终想要实现的是与上面相同的最终结果,但是通过一个链,在这里,我无法弄清楚如何实例化 task2,而不是使用 task1 返回的(位置)参数,而是使用 task1.result 作为**kwargs:

chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async()  #THIS DOESN'T WORK!

我怀疑我可以回去重写我的任务,以便它们返回位置参数而不是字典,这可能会解决问题,但在我看来,应该有某种方法可以访问 task1 在 task2 中的返回对象**双星的功能。我还怀疑我在这里遗漏了一些关于 Celery 子任务实现或 *args 与 **kwargs 的相当明显的东西。

希望这是有道理的。并提前感谢任何提示。

4

3 回答 3

10

这是我对这个问题的看法,使用抽象任务类:

from __future__ import absolute_import
from celery import Task
from myapp.tasks.celery import app   


class ChainedTask(Task):
    abstract = True    

    def __call__(self, *args, **kwargs):
        if len(args) == 1 and isinstance(args[0], dict):
            kwargs.update(args[0])
            args = ()
        return super(ChainedTask, self).__call__(*args, **kwargs)

@app.task(base=ChainedTask)
def task1(x, y):
    return {'x': x * 2, 'y': y * 2, 'z': x * y}    


@app.task(base=ChainedTask)
def task2(x, y, z):
    return {'x': x * 3, 'y': y * 3, 'z': z * 2}

您现在可以像这样定义和执行您的链:

from celery import chain

pipe = chain(task1.s(x=1, y=2) | task2.s())
pipe.apply_async()
于 2014-10-24T11:11:50.090 回答
2

chain其他画布原语属于功能实用程序家族,例如mapreduce

例如 wheremap(target, items)调用target(item)列表中的每个项目,Python 有一个很少使用的 map 版本,称为itertools.starmap,而不是调用target(*item).

虽然我们可以添加工具箱,starchain甚至可以添加kwstarchain到工具箱中,但这些工具箱非常专业,可能不会经常使用。

有趣的是,Python 使用 list 和 generator 表达式使这些变得不必要,因此 map 被替换为 ,[target(item) for item in item]而 starmap被替换为[target(*item) for item in item]

因此,与其为每个原语实现几个替代方案,我认为我们应该专注于寻找一种更灵活的方式来支持这一点,例如使用 celery 驱动的生成器表达式(如果可能,如果不是类似强大的东西)

于 2013-02-20T13:12:17.727 回答
1

因为这不是 celery 内置的,所以我自己写了一个装饰器函数来做类似的事情。

# Use this wrapper with functions in chains that return a tuple. The
# next function in the chain will get called with that the contents of
# tuple as (first) positional args, rather than just as just the first
# arg. Note that both the sending and receiving function must have
# this wrapper, which goes between the @task decorator and the
# function definition. This wrapper should not otherwise interfere
# when these conditions are not met.

class UnwrapMe(object):
    def __init__(self, contents):
        self.contents = contents

    def __call__(self):
        return self.contents

def wrap_for_chain(f):
    """ Too much deep magic. """
    @functools.wraps(f)
    def _wrapper(*args, **kwargs):
        if type(args[0]) == UnwrapMe:
            args = list(args[0]()) + list(args[1:])
        result = f(*args, **kwargs)

        if type(result) == tuple and current_task.request.callbacks:
            return UnwrapMe(result)
        else:
            return result
    return _wrapper

我的像这个starchain概念一样展开,但你可以很容易地修改它来展开 kwargs。

于 2013-04-03T03:05:55.450 回答