0

我有一个相当复杂的工作流程(动态构建),看起来像这样:

workflow= chain(
signature('workflow.tasks.start_workflow', kwargs= {}),
chord(
    [
        signature('workflow.tasks.group_task', kwargs= {}),
        signature('workflow.tasks.sample_task_2', kwargs= {}),
        signature('workflow.tasks.sample_task_10', kwargs= {})
    ],
    chain(
        signature('workflow.tasks.sample_task_3', kwargs= {}),
        chord(
            [
                signature('workflow.tasks.group_task', kwargs= {}),
                chain(
                    signature('workflow.tasks.sample_task_5', kwargs= {}),
                    signature('workflow.tasks.sample_task_6', kwargs= {}),
                ),
                chain(
                    signature('workflow.tasks.sample_task_7', kwargs= {}),
                    signature('workflow.tasks.sample_task_8', kwargs= {}),
                )
            ],
            chain(
                signature('workflow.tasks.sample_task_9', kwargs= {}),
                signature('workflow.tasks.end_workflow', kwargs= {})
            )
        )
    )
)

)

然后芹菜变成这样:

workflow.tasks.start_workflow() | celery.chain(
    [
        workflow.tasks.group_task(),
        workflow.tasks.sample_task_2(),
        workflow.tasks.sample_task_10()
    ],     
    tasks=(
        workflow.tasks.sample_task_3(),
        celery.chain(
            [
                workflow.tasks.group_task(),
                workflow.tasks.sample_task_5() | workflow.tasks.sample_task_6(),
                workflow.tasks.sample_task_7() | workflow.tasks.sample_task_8()
            ], tasks=(
                workflow.tasks.sample_task_9(),
                workflow.tasks.end_workflow()
            )
        )
    )
)

请注意和弦末尾的任务是如何被推入“任务”标题的。根据我的阅读,这些任务存储在主任务标题中,并且在和弦标题完成执行之前不会放入队列中。

当我尝试显示整个工作流的 task_id 时(我希望它是工作流中的 task_id 之一)。

workflow= workflow.apply_async()
print workflow.id
>> 1d538872-79af-4585-aef8-ebfc06cd0b5b

我得到的这个任务 ID 没有存储在 celery_taskmeta 或 celery_tasksetmeta 中。这不是在工作流中执行的任何任务(请参阅下面的工作日志)。任何想法,这个 task_id 代表什么,如果有的话我可以将它链接到任何正在执行的任务?

我希望能够遍历结果并显示工作流中每个任务的状态。但是,我得到的这个任务 ID 似乎与任何任务无关。下面是工作日志,您会看到上面打印的任务 ID 找不到!有任何想法吗?谢谢。

[2015-03-03 15:34:42,306: INFO/MainProcess] Received task: workflow.tasks.start_workflow[45b54d46-56cc-4c46-a126-d38ab8e8a2e8]
[2015-03-03 15:34:42,334: INFO/MainProcess] Received task: workflow.tasks.group_task[ccce5c5b-0946-499a-9879-613b79333419]
[2015-03-03 15:34:42,335: INFO/MainProcess] Received task: workflow.tasks.sample_task_2[3262ad97-c8ea-4b26-9bdc-f3a95fd41cf4]
[2015-03-03 15:34:42,335: INFO/MainProcess] Received task: workflow.tasks.sample_task_10[64286589-7665-4574-864a-69f3175ec281]
[2015-03-03 15:34:42,336: INFO/MainProcess] Received task: celery.chord_unlock[055938f3-5a4e-4c77-aa76-ab3399206c87] eta:[2015-03-03 15:34:43.335836+00:00]
[2015-03-03 15:34:42,363: INFO/MainProcess] Task workflow.tasks.start_workflow[45b54d46-56cc-4c46-a126-d38ab8e8a2e8] succeeded in 0.0562515768688s: None
[2015-03-03 15:34:42,391: INFO/MainProcess] Task workflow.tasks.group_task[ccce5c5b-0946-499a-9879-613b79333419] succeeded in 0.0555328750052s: True
[2015-03-03 15:34:43,402: INFO/MainProcess] Received task: celery.chord_unlock[055938f3-5a4e-4c77-aa76-ab3399206c87] eta:[2015-03-03 15:34:44.400298+00:00]
[2015-03-03 15:34:43,404: INFO/MainProcess] Task celery.chord_unlock[055938f3-5a4e-4c77-aa76-ab3399206c87] retry: Retry in 1s
[2015-03-03 15:34:45,323: INFO/MainProcess] Received task: celery.chord_unlock[055938f3-5a4e-4c77-aa76-ab3399206c87] eta:[2015-03-03 15:34:46.320054+00:00]
[2015-03-03 15:34:45,325: INFO/MainProcess] Task celery.chord_unlock[055938f3-5a4e-4c77-aa76-ab3399206c87] retry: Retry in 1s
[2015-03-03 15:34:47,299: INFO/MainProcess] Received task: celery.chord_unlock[055938f3-5a4e-4c77-aa76-ab3399206c87] eta:[2015-03-03 15:34:48.297891+00:00]
[2015-03-03 15:34:47,299: INFO/MainProcess] Task celery.chord_unlock[055938f3-5a4e-4c77-aa76-ab3399206c87] retry: Retry in 1s
[2015-03-03 15:34:47,390: INFO/MainProcess] Task workflow.tasks.sample_task_2[3262ad97-c8ea-4b26-9bdc-f3a95fd41cf4] succeeded in 5.05364968092s: True
[2015-03-03 15:34:47,392: INFO/MainProcess] Task workflow.tasks.sample_task_10[64286589-7665-4574-864a-69f3175ec281] succeeded in 5.05569092603s: True
[2015-03-03 15:34:48,426: INFO/MainProcess] Task celery.chord_unlock[055938f3-5a4e-4c77-aa76-ab3399206c87] succeeded in 0.0345057491213s: None
[2015-03-03 15:34:48,426: INFO/MainProcess] Received task: workflow.tasks.sample_task_3[89e6b3a6-1595-48e3-801d-28b36aafb581]
[2015-03-03 15:34:53,483: INFO/MainProcess] Received task: workflow.tasks.group_task[5e4f63b9-6968-4210-91f7-b89e939d1c9a]
[2015-03-03 15:34:53,484: INFO/MainProcess] Received task: workflow.tasks.sample_task_5[fc10ce62-5701-4c75-987e-7dac7b17bab6]
[2015-03-03 15:34:53,484: INFO/MainProcess] Received task: workflow.tasks.sample_task_7[9893dd87-844b-44a3-b5d8-bca086ee15ec]
[2015-03-03 15:34:53,485: INFO/MainProcess] Received task: celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] eta:[2015-03-03 15:34:54.484729+00:00]
[2015-03-03 15:34:53,490: INFO/MainProcess] Task workflow.tasks.sample_task_3[89e6b3a6-1595-48e3-801d-28b36aafb581] succeeded in 5.06310376804s: True
[2015-03-03 15:34:53,527: INFO/MainProcess] Task workflow.tasks.group_task[5e4f63b9-6968-4210-91f7-b89e939d1c9a] succeeded in 0.043258280959s: True
[2015-03-03 15:34:55,328: INFO/MainProcess] Received task: celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] eta:[2015-03-03 15:34:56.327396+00:00]
[2015-03-03 15:34:55,329: INFO/MainProcess] Task celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] retry: Retry in 1s
[2015-03-03 15:34:57,336: INFO/MainProcess] Received task: celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] eta:[2015-03-03 15:34:58.333722+00:00]
[2015-03-03 15:34:57,339: INFO/MainProcess] Task celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] retry: Retry in 1s
[2015-03-03 15:34:58,424: INFO/MainProcess] Received task: celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] eta:[2015-03-03 15:34:59.423050+00:00]
[2015-03-03 15:34:58,425: INFO/MainProcess] Task celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] retry: Retry in 1s
[2015-03-03 15:34:58,517: INFO/MainProcess] Received task: workflow.tasks.sample_task_8[aff7d810-9989-4dfe-8cca-1032efcf4624]
[2015-03-03 15:34:58,521: INFO/MainProcess] Received task: workflow.tasks.sample_task_6[b758014f-5837-4bed-9426-5c2e03af2c2f]
[2015-03-03 15:34:58,538: INFO/MainProcess] Task workflow.tasks.sample_task_7[9893dd87-844b-44a3-b5d8-bca086ee15ec] succeeded in 5.05185400695s: True
[2015-03-03 15:34:58,539: INFO/MainProcess] Task workflow.tasks.sample_task_5[fc10ce62-5701-4c75-987e-7dac7b17bab6] succeeded in 5.05522017297s: True
[2015-03-03 15:35:01,325: INFO/MainProcess] Received task: celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] eta:[2015-03-03 15:35:02.322996+00:00]
[2015-03-03 15:35:01,326: INFO/MainProcess] Task celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] retry: Retry in 1s
[2015-03-03 15:35:03,337: INFO/MainProcess] Received task: celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] eta:[2015-03-03 15:35:04.335374+00:00]
[2015-03-03 15:35:03,339: INFO/MainProcess] Task celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] retry: Retry in 1s
[2015-03-03 15:35:03,594: INFO/MainProcess] Task workflow.tasks.sample_task_6[b758014f-5837-4bed-9426-5c2e03af2c2f] succeeded in 5.0567153669s: True
[2015-03-03 15:35:03,595: INFO/MainProcess] Task workflow.tasks.sample_task_8[aff7d810-9989-4dfe-8cca-1032efcf4624] succeeded in 5.05580001394s: True
[2015-03-03 15:35:05,315: INFO/MainProcess] Task celery.chord_unlock[018c1e9e-3b2e-4a4c-90ed-5265b01eb9fb] succeeded in 0.0105995119084s: None
[2015-03-03 15:35:05,316: INFO/MainProcess] Received task: workflow.tasks.sample_task_9[2492e5e0-d6df-402c-b5a5-ab15d99b42ad]
[2015-03-03 15:35:10,336: INFO/MainProcess] Received task: workflow.tasks.end_workflow[4a2c0a15-77c9-417e-bd21-8a7f1d248981]
[2015-03-03 15:35:10,357: INFO/MainProcess] Task workflow.tasks.sample_task_9[2492e5e0-d6df-402c-b5a5-ab15d99b42ad] succeeded in 5.04111725814s: True
[2015-03-03 15:35:10,374: INFO/MainProcess] Task workflow.tasks.end_workflow[4a2c0a15-77c9-417e-bd21-8a7f1d248981] succeeded in 0.0367547438946s: None
4

1 回答 1

0

I don't remember exactly where that identifier gets generated, but celery does generate task identifiers for internal use depending on what canvas primitives are used. The biggest problem I ran into was the chord_unlock tasks, which you have no control over.

This won't help you in the short term, but I created a patch a long time ago that allows you to pass a 'root_id' argument when creating a workflow that will then get propagated to all children tasks and can be used to link them all together like you describe. It will also include a 'parent_id' field to further assist with traversing/tracking workflows as well.

The idea was accepted, but the code hasn't landed in an official release yet. It is supposed to land in the next major release v3.2.0 and you can monitor it here: https://github.com/celery/celery/pull/1318

于 2015-03-04T04:01:04.420 回答