我有一个用例,其中有一个客户列表。客户端可以从列表中添加或删除,它们可以有不同的开始日期和不同的初始参数。
我想使用气流根据每个客户端的初始开始日期回填所有数据+如果出现故障则重新运行。我正在考虑为每个客户创建一个 SubDag。这会解决我的问题吗?
如何根据 client_id 动态创建 SubDag?
我有一个用例,其中有一个客户列表。客户端可以从列表中添加或删除,它们可以有不同的开始日期和不同的初始参数。
我想使用气流根据每个客户端的初始开始日期回填所有数据+如果出现故障则重新运行。我正在考虑为每个客户创建一个 SubDag。这会解决我的问题吗?
如何根据 client_id 动态创建 SubDag?
您绝对可以动态创建 DAG 对象:
def make_client_dag(parent_dag, client):
return DAG(
'%s.client_%s' % (parent_dag.dag_id, client.name),
start_date = client.start_date
)
然后,您可以在主 dag 的 SubDagOperator 中使用该方法:
for client in clients:
SubDagOperator(
task_id='client_%s' % client.name,
dag=main_dag,
subdag = make_client_dag(main_dag, client)
)
这将为集合的每个成员创建一个特定的 subdag,每个成员clients
都将在下一次调用主 dag 时运行。我不确定你是否会得到你想要的回填行为。