我对气流相当陌生,我目前正试图在我的 SimpleHttpOperators 之间传递信息。
这是检索数据的地方:
request_city_information = SimpleHttpOperator(
http_conn_id='overpass',
task_id='basic_city_information',
headers={"Content-Type": "application/x-www-form-urlencoded"},
method='POST',
data=f'[out:json]; node[name={name_city}][capital]; out center;',
response_filter=lambda response: response.json()['elements'][0],
dag=dag,)
然后我想在以下运算符中使用来自此的响应:
request_city_attractions = SimpleHttpOperator(
http_conn_id='overpass',
task_id='city_attractions',
headers={"Content-Type": "application/x-www-form-urlencoded"},
method='POST',
data=f"[out:json];(nwr[tourism='attraction'][wikidata](around:{search_radius},"
f"{request_city_information.xcom_pull(context='ti')['lat']}"
f",10););out body;>;out skel qt;",
dag=dag)
如您所见,我尝试通过request_city_information.xcom_pull(context='ti')
. 但是,我的上下文在这里似乎是错误的。
由于我的数据已经写入 XComs,因此我认为不需要XCOM_push='True'
,正如这里所建议的那样。
自气流 2.x 以来,XCom 似乎发生了变化,因为我发现许多建议的解决方案对我不起作用。
我相信我的思维过程中存在重大差距,我只是不知道在哪里。
我将不胜感激任何参考示例或帮助!提前致谢