如何将函数参数设置为从先前运行的任务/函数返回的任务。请注意,这些任务是以编程方式定义的,因此我不能简单地使用xcom_pull(task_id="some_task")
,因为这些任务是在循环中定义的(如下所示):
def scrape(site):
return requests.get(site).content
def echo(**kwargs):
ti = kwargs['ti']
# how can I use the input from `scrape()` here?
for idx, site in enumerate(sites):
myop = PythonOperator(task_id='scape_%s' % str(idx), python_callable=scrape, op_args=[site], dag=dag)
echo_op = PythonOperator(task_id='echo_%s' % str(idx), dag=dag, provide_context=True, python_callable=echo)
myop.set_downstream(echo_op)