5

Using prefect, I would like to create a new flow from two other flows.

enter image description here enter image description here

The error I am getting is A task with the slug "add_num" already exists in this flow. Is it possible to update Flows that use the same tasks or Parameters. Below is a minimal example of what I am trying to accomplish. `

from prefect import task, Flow, Parameter

@task
def add_one(x):
    return x+1

with Flow("Flow 1") as flow_1:
    add_num = Parameter("add_num", default=10)
    new_num1 = add_one(add_num)

@task
def add_two(y):
    return y+1

with Flow("Flow 2") as flow_2:
   add_num = Parameter("add_num", default=10)
   new_num2 = add_two(add_num)

 combo_fl = Flow("Add Numbers")

 combo_fl.update(flow_1)
 combo_fl.update(flow_2, validate=False)

I did see this piece of code on the slack channel that might tie into solving this problem but I am not sure how to use it.

class GlobalParameter(Parameter):
    def __init__(self, name, slug=None, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.slug = slug or uuid.uuid4()

Thanks in advance.

4

1 回答 1

4

由于参数在 API 中由名称唯一标识,因此您不能组合具有相同名称的不同参数的两个流。但是,您可以做的是在每个流中使用一个通用参数,如下所示:

from prefect import task, Flow, Parameter

## initialize the Parameter outside of any
## Flow context

add_num = Parameter("add_num", default=10)

@task
def add_one(x):
    return x+1

with Flow("Flow 1") as flow_1:
    new_num1 = add_one(add_num)

@task
def add_two(y):
    return y+1

with Flow("Flow 2") as flow_2:
   new_num2 = add_two(add_num)

combo_fl = Flow("Add Numbers")

combo_fl.update(flow_1)
combo_fl.update(flow_2, validate=False)

因为正在使用的 Parameter 实际上是 Parameter 类的同一个实例,所以您的更新将成功。

于 2020-03-14T17:48:53.020 回答