1

我正在尝试找到一种方法来更新开始用于调度流程的每次迭代的参数。

例如,假设我有一个计划在一年的每个星期一运行一次的流程。对于第一个星期一,流程需要以 5 的参数运行。下星期一需要以 7 的参数运行,以此类推。每周运行所需的参数将改变一个常数。

根据文档,看起来我可以为每次运行创建一个带有相应参数的时钟,但对于为许多运行安排的流来说,这似乎过多。

在 Prefect 中是否有更简单的方法?

4

1 回答 1

0

这听起来像是您想将某种形式的业务逻辑封装到您的计算中,这是向 FlowParameter添加新的一个很好的用例:Task

import prefect
from prefect import task, Flow, Parameter


varying_param = Parameter("param", default=None) # none as the default

@task(name="Varying Parameter")
def param_calculation(p):
    time = prefect.context.scheduled_start_time 

    # if a value was provided, use it
    if p is not None:
        return p
    
    # do some calculations to decide what value is appropriate
    # and return it


with Flow("Minimal example") as flow:
    param_value = param_calculation(varying_param)
    # now use this value in downstream tasks
于 2020-11-13T03:16:57.970 回答