0

我想创建一个将动态流设置为依赖于另一个动态流的流。

下面是我想做的。

我注册了 N 个数据流。

@task
def download(data):
    pass
@task
def extract(data):
    pass
@task
def preprocess(data):
    pass

@task
def dump_data(data):
    pass 

for i in range(N):
   with Flow(f"data_{i}") as f:
       download_task = download(i)
       extract_task = extract(download_task)
       preprocess_task = preprocess(extract_task)
       dump_task = dump_data(preprocess_task)
   f.register()

我也注册了 M 个模型。

for i in range(M):
    Flow(f"model_{i}") as f:
       dependent_data_list_task = get_dependent_data_list(f"module_{i}")
       read_data(path)

模型可以依赖于任何模型

model_1(upstream = [data_1, data_2, data_50])
model_2(upstream = [data_3, data_60])
...

我试图弄清楚如何做到这一点。

我尝试在为模型创建流程的地方设置依赖项。这导致我多次运行数据。

我希望数据只运行一次。

4

0 回答 0