3

我希望能够重复使用相同的参数化Prefect流程,但时间表因输入而异。例如:

from prefect import task, Flow, Task, Parameter
from prefect.schedules import CronSchedule

diurnal   = ['rooster', 'dog']
nocturnal = ['owl', 'hampster']

# Schedules
diurnal_schedule   = CronSchedule("50 7 * * mon,wed")
nocturnal_schedule = CronSchedule("15 12 * * tue,thu")

# Flow is common to both types, though with different schedules.
with Flow(name="wakuptime") as this_flow:
    animals         = Parameter("animals")
    wakeup(animals)

this_flow.run(parameters = dict(animals = diurnal)) on diurnal_schedule
this_flow.run(parameters = dict(animals = nocturnal)) on nocturnal_schedule

有什么建议么?

4

1 回答 1

3

这现在可以在 Prefect 的master 分支上实现,并将在下周发布 0.9.2:Clocks API现在允许为每个时钟提供参数,这些参数将传递给从该时钟生成的每个流运行。在您的情况下,您可以执行以下操作:

from prefect import task, Flow, Task, Parameter
from prefect.schedules import clocks, Schedule

diurnal   = ['rooster', 'dog']
nocturnal = ['owl', 'hampster']

# Clocks
diurnal_clock   = clocks.CronClock("50 7 * * mon,wed", parameter_defaults={"animals": diurnal})
nocturnal_clock = clocks.CronClock("15 12 * * tue,thu", parameter_defaults={"animals": nocturnal})

# the full schedule
schedule = Schedule(clocks=[diurnal_clock, nocturnal_clock])

# Flow is common to both types, though with different schedules.
with Flow(name="wakuptime", schedule=schedule) as this_flow:
    animals = Parameter("animals", default=[])
    wakeup(animals)

# will run on the schedule with varying parameter values
this_flow.run()
于 2020-01-28T19:50:01.850 回答