我的代码如下所示:
def etl():
for item in ['FIRST','SECCOND','THIRD']:
if item == 'a':
requests = ['Data1','Data3']
elif item == 'b':
requests = ['Data1']
for data_name in requests:
@task(task_id=f'{item}_{data_name}_task_a')
def taska():
a,b = some_func
vars_dict = {'a': a,
'b': b}
return vars_dict
@task(task_id=f'{account}_{data_name}_get_liveops_data')
def taskb(vars_dict):
some_other_func
return True
if data_name=='Data1':
@task(task_id='last_task')
def last_task(success):
dim_experiments.main()
return
vars_dict = taska()
success = taskb(vars_dict)
last_task(success)
myc_dag = etl()
目标是last_task
依赖taska
和taskb
除那个taska
和taskb
那个下载Data3
请求。我无法使用它来实现它TaskFlow API