问题
我使用 celery 启动如下所示的任务集:
- 我执行了一批可以并行运行的任务,这批任务的数量从几十到几千不等。
我将这些任务的结果汇总到一个答案中,然后对这个答案做一些事情——比如存储到数据库、保存到特殊的结果文件等等。基本上在任务完成执行后,我必须调用具有以下签名的函数:
def callback(result_file_name, task_result_list): #store in file def callback(entity_key, task_result_list): #store in db
现在第 1 步在 Celery 队列中完成,第 2 步在 celery 外部完成:
tasks = []
# add taksks to tasks list
task_group = group()
task_group.tasks = tasks
result = task_group.apply_async()
res = result.join()
# Aggregate results
# Save results to file, database whatever
这种方法很麻烦,因为我必须停止单个线程,直到所有任务都执行完毕(这可能需要几个小时)。
我也想以某种方式将第 2 步移到 celery --- 基本上我需要向整个任务集添加一个回调(据我所知,Celery 不支持它)或提交一个在所有这些子任务之后执行的任务。
有谁知道该怎么做?我在 django 环境中使用它,所以我可以在数据库中存储一些状态。
总结一下我最近的发现
和弦不行
我不能直接使用和弦,因为和弦使我能够创建这样的回调:
def callback(task_result_list):
#store in file
没有明显的方法可以将附加参数传递给回调(特别是因为这些回调不能是本地函数)。
使用数据库
我可以使用存储结果,TaskSetMeta
但该实体没有状态字段 --- 所以即使我向 TaskSetMeta 添加一个信号,我也必须汇集可能具有显着开销的任务结果。