我想创建一个将动态流设置为依赖于另一个动态流的流。
下面是我想做的。
我注册了 N 个数据流。
@task
def download(data):
pass
@task
def extract(data):
pass
@task
def preprocess(data):
pass
@task
def dump_data(data):
pass
for i in range(N):
with Flow(f"data_{i}") as f:
download_task = download(i)
extract_task = extract(download_task)
preprocess_task = preprocess(extract_task)
dump_task = dump_data(preprocess_task)
f.register()
我也注册了 M 个模型。
for i in range(M):
Flow(f"model_{i}") as f:
dependent_data_list_task = get_dependent_data_list(f"module_{i}")
read_data(path)
模型可以依赖于任何模型
model_1(upstream = [data_1, data_2, data_50])
model_2(upstream = [data_3, data_60])
...
我试图弄清楚如何做到这一点。
我尝试在为模型创建流程的地方设置依赖项。这导致我多次运行数据。
我希望数据只运行一次。