10

我想在芹菜链命令中使用块。

chain = task1.s(arg1) | task2.chunks(?,CHUNK_SIZE) | task3.chunks(?, CHUNK_SIZE)

基本上我想做的是运行task1,将它的结果分块并将块发送到task2,然后它应该调用task3,task3也应该从task2接收分块结果以完成该过程。为什么?因为 task1 和 task2 都可以返回相当数量的我想分批处理的项目。

上面的代码不起作用,因为我不太确定要放什么而不是问号才能使它起作用。

我不太确定这是否可行,因为搜索并没有提供太多结果,因此在无法构建这样的工作流程的情况下,我会对合理的替代方案感兴趣。

4

1 回答 1

2

我不确定现有的原语是否可以做到这一点。

我可以考虑是否有两种选择/解决方法:

  1. 使用块/和弦从任务中启动新任务。

    你一定已经想到了这一点。这个想法是正常调用 task1 apply_async。一旦该任务完成生成需要分块的大量输出,只需使用原语进一步为任务 2 创建块。同样,对 task2 和 task3 之间的转换执行相同的步骤。当您最终等待获取内部任务的结果时,从任务中调用任务只是一个坏主意。所以请记住,如果您正在等待任务结果,那么这不是推荐的方法。

    @task
    def task1(some_input):
        # Do stuff
        # Create a list of lists where the inner list represent the *args to send to an individual task
        task2.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async()
    
    @task
    def task2(a, b):
        # Do stuff
        # Create a list of lists where the inner list represent the *args to send to an individual task
        task3.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async()
    
    @task
    def task3(a, b):
        # Do stuff
    
  2. 这个解决方案有点有趣。我在 celery Github 问题页面上遇到了一个特定的请求。查看 steeve 的这个拉取请求:https ://github.com/celery/celery/pull/817 据我了解,他创建了一个动态任务装饰器(关于名称是否应该是那个存在争议),它了解任务是否返回子任务。如果是这样,它首先应用该子任务。他声称他在 Veezio 的生产中成功地使用了它。我自己没有试过。我建议前往该线程并提出一些问题。或者甚至在 Twitter 或 IRC 或其他地方向 Steeve 窃听。

于 2016-09-07T14:20:37.530 回答