Prefect 不会对您的 Flow 或其任务如何依赖于时间做出任何隐含的假设,因此执行回填取决于您的 Flow 的结构。通常有两种时间显式影响流的方式:
- 通过一个
Parameter
或DateTimeParameter
- 通过
prefect.context
(其中包括许多与时间相关的字段,在此处描述)
鉴于此,可以通过创建任意数量的临时计划流运行并覆盖适当的上下文键或默认参数值来实现回填。(请注意,可以为任何流创建临时运行,无论该流是否有计划。)
为了更准确,这里有两个触发单个回填运行的示例(为了容纳更多运行,循环适当的值并为每个运行创建一个运行):
使用上下文
import pendulum
import prefect
@prefect.task
def do_something_time_specific():
"""
This task uses the timestamp provided to the custom `backfill_time`
context key; if that does not exist, it falls back on the built-in
`scheduled_start_time` context key.
"""
current_time = prefect.context.get("backfill_time") or prefect.context.get("scheduled_start_time")
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# performs some action dealing with this timestamp
flow = Flow("backfill", tasks=[do_something_time_specific])
## using Core
flow.run() # will use current timestamp
flow.run(context={"backfill_time": "1986-01-02"}) # will use old timestamp
## using an API
prefect.Client().create_flow_run(flow_id="FLOWID",
context={"backfill_time": "1986-01-02"}) # will use old timestamp
使用参数
import pendulum
import prefect
current_time = prefect.Parameter("current_time", default=None)
@prefect.task
def do_something_time_specific(current_time):
"""
This task uses the timestamp provided to the task explicitly.
"""
current_time = current_time or pendulum.now("utc") # uses "now" if not provided
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# performs some action dealing with this timestamp
with Flow("backfill") as flow:
do_something_time_specific(current_time)
## using Core
flow.run() # will use current timestamp
flow.run(current_time="1986-01-02") # will use old timestamp
## using an API
prefect.Client().create_flow_run(flow_id="FLOWID",
parameters={"current_time": "1986-01-02"}) # will use old timestamp
较新的参数类,例如DateTimeParameter
提供更好的类型保证,但希望这能证明这个想法。
编辑:为了完整起见,请注意,可以在 Core 中为带有计划的流创建临时运行,方法是运行flow.run(run_on_schedule=False)