6

问题

我使用 celery 启动如下所示的任务集:

  1. 我执行了一批可以并行运行的任务,这批任务的数量从几十到几千不等。
  2. 我将这些任务的结果汇总到一个答案中,然后对这个答案做一些事情——比如存储到数据库、保存到特殊的结果文件等等。基本上在任务完成执行后,我必须调用具有以下签名的函数:

    def callback(result_file_name, task_result_list): 
        #store in file
    
    
    def callback(entity_key, task_result_list):
        #store in db 
    

现在第 1 步在 Celery 队列中完成,第 2 步在 celery 外部完成:

    tasks = []

    # add taksks to tasks list 

    task_group = group()
    task_group.tasks = tasks

    result = task_group.apply_async()

    res = result.join()

    # Aggregate results 

    # Save results to file, database whatever

这种方法很麻烦,因为我必须停止单个线程,直到所有任务都执行完毕(这可能需要几个小时)。

我也想以某种方式将第 2 步移到 celery --- 基本上我需要向整个任务集添加一个回调(据我所知,Celery 不支持它)或提交一个在所有这些子任务之后执行的任务。

有谁知道该怎么做?我在 django 环境中使用它,所以我可以在数据库中存储一些状态。

总结一下我最近的发现

和弦不行

我不能直接使用和弦,因为和弦使我能够创建这样的回调:

    def callback(task_result_list): 
        #store in file

没有明显的方法可以将附加参数传递给回调(特别是因为这些回调不能是本地函数)。

使用数据库

我可以使用存储结果,TaskSetMeta但该实体没有状态字段 --- 所以即使我向 TaskSetMeta 添加一个信号,我也必须汇集可能具有显着开销的任务结果。

4

1 回答 1

3

好吧,答案真的很简单,我确实可以使用和弦 --- 并且必须将其他参数(如报告文件名等)作为 kwargs 传递。

这是和弦任务:

@task
def print_and_sum(to_sum, file_name):
    print file_name
    print sum(to_sum)
    return file_name, sum(to_sum)

下面是实例化它的方法:

subtasks = [...]
result = chord(subtasks)(print_and_sum.subtask(kwargs={'file_name' : 'report_file.csv'}))
于 2012-06-22T13:41:46.217 回答