我的流程很长,我想将其状态标记为Failed
特定任务失败。
我在这里阅读了文档,但是他们似乎没有指定这种情况。我的猜测是我需要在@task()
装饰器中以某种方式具体说明这一点,但我不知道如何。
非常感谢任何指导。
这可以通过将您的特定任务声明为 Flow 的“参考任务”来实现。 参考任务是最终确定每个流程运行的最终状态的任务。
例如,以下代码片段创建了一个流程,其中包含两个独立的、不相关的任务,这些任务随机失败了一半。但是,整体流运行状态将仅根据 的状态确定task_one
,因为我们将该任务指定为唯一的参考任务:
import random
from prefect import task, Flow
@task
def task_one():
if random.random() > 0.5:
raise ValueError("Random failure")
@task
def task_two():
if random.random() > 0.5:
raise ValueError("Random failure")
flow = Flow("Two Task Flow", tasks=[task_one, task_two])
flow.set_reference_tasks([task_one])