在完美的工作流程中,我试图保留每个计划运行的数据。我需要比较每个以前和当前结果的数据。我尝试了 Localresult 和 checkpoint=true 但它不起作用。例如,
from prefect import Flow, task
from prefect.engine.results import LocalResult
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
import os
import prefect
@task("func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def file_scan():
files = os.listdir(test)
#prefect.context.a = files
return files
schedule = IntervalSchedule(interval=timedelta(seconds=61))
with Flow("Test persist data", schedule) as flow:
a = file_scan()
flow.run()
我的流程安排为每 61 秒/每分钟一次。在第一次运行时,我可能会得到空结果,但对于第二次计划运行,我应该得到以前的流结果进行比较。谁能帮我实现这一目标?谢谢!