TL;博士;
我无法使用 prefect 的FlowRunner来解决上述问题。我可能要么用错了(见下文),要么错过了一些东西。真的很感激任何指点!
问题
我通读了优秀的核心文档,发现处理失败和本地调试的部分与此最相关(可能遗漏了一些东西!)。FlowRunner类似乎(对我来说)是解决方案。
看看我是否可以使用 Flow Runner 来恢复失败的流程:
- 进行了失败的流程运行:
from time import sleep
import prefect
from prefect import Flow, task
@task
def success():
sleep(3)
return
@task
def failure():
return 1 / 0
def get_flow_runner():
with Flow("Success/Failure") as flow:
success()
failure()
return prefect.engine.FlowRunner(flow)
- 在 iPython 中运行它并保存状态:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: state = flow_runner.run()
将 1 / 0 替换为 1 / 1
failure()
这样任务就会成功:最后将先前的状态传递给
flow_runner
希望它会恢复流程:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: flow_runner.run(task_states=state.result)
整个流程再次运行,包括 3 秒成功的任务。