1

我对气流相当陌生,我目前正试图在我的 SimpleHttpOperators 之间传递信息。

这是检索数据的地方:

request_city_information = SimpleHttpOperator(
http_conn_id='overpass',
task_id='basic_city_information',
headers={"Content-Type": "application/x-www-form-urlencoded"},
method='POST',
data=f'[out:json]; node[name={name_city}][capital]; out center;',
response_filter=lambda response: response.json()['elements'][0],
dag=dag,)

然后我想在以下运算符中使用来自此的响应:

request_city_attractions = SimpleHttpOperator(
http_conn_id='overpass',
task_id='city_attractions',
headers={"Content-Type": "application/x-www-form-urlencoded"},
method='POST',
data=f"[out:json];(nwr[tourism='attraction'][wikidata](around:{search_radius},"
     f"{request_city_information.xcom_pull(context='ti')['lat']}"
     f",10););out body;>;out skel qt;",
dag=dag)

如您所见,我尝试通过request_city_information.xcom_pull(context='ti'). 但是,我的上下文在这里似乎是错误的。

由于我的数据已经写入 XComs,因此我认为不需要XCOM_push='True',正如这里所建议的那样。

自气流 2.x 以来,XCom 似乎发生了变化,因为我发现许多建议的解决方案对我不起作用。
我相信我的思维过程中存在重大差距,我只是不知道在哪里。

我将不胜感激任何参考示例或帮助!提前致谢

4

1 回答 1

0

我现在用一种完全不同的方法解决了这个问题,如果你们知道第一个是如何工作的,我会很乐意对此进行解释。

这是我的解决方案:

with DAG(
    'city_info',
    default_args=dafault_args,
    description='xcom test',
    schedule_interval=None,
) as dag:
#TODO: Tasks with conn_id
def get_city_information(**kwargs):
    payload = f'[out:json]; node[name={name_city}][capital]; out center;'
    #TODO: Request als Connection
    r = requests.post('https://overpass-api.de/api/interpreter', data=payload)
    ti = kwargs['ti']
    ti.xcom_push('basic_city_information', r.json())


get_city_information_task = PythonOperator(
    task_id='get_city_information_task',
    python_callable=get_city_information
)


def get_city_attractions(**kwargs):
    ti = kwargs['ti']
    city_information = ti.xcom_pull(task_ids='get_city_information_task', key='basic_city_information')
    payload = f"[out:json];(nwr[tourism='attraction'][wikidata](around:{search_radius}" \
              f",{city_information['elements'][0]['lat']},{city_information['elements'][0]['lon']}" \
              f"););out body;>;out skel qt;"
    r = requests.post('https://overpass-api.de/api/interpreter', data=payload)
    #TODO: Json as Object
    ti.xcom_push('city_attractions', r.json())


get_city_attractions_task = PythonOperator(
    task_id='get_city_attractions_task',
    python_callable=get_city_attractions
)

get_city_information_task >> get_city_attractions_task
于 2021-01-31T19:16:20.457 回答