16

我正在学习气流并且有一个简单的问题。下面是我的 DAG,名为dog_retriever

import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json



default_args = {
    'owner': 'Loftium',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 9),
    'email': 'rachel@loftium.com',
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=3),
}

dag = DAG('dog_retriever',
    schedule_interval='@once',
    default_args=default_args)

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    dag=dag)

t2 = SimpleHttpOperator(
    task_id='get_breeds',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breeds/list',
    headers={"Content-Type": "application/json"},
    dag=dag)
    
t2.set_upstream(t1)

作为测试 Airflow 的一种方法,我只是在这个非常简单的http://dog.ceo API 中向某些端点发出了两个 GET 请求。目标是学习如何处理通过 Airflow 检索到的一些数据

执行正在运行——我的代码成功调用了任务 t1 和 t2 中的端点,我可以看到它们以基于set_upstream我编写的规则的正确顺序记录在 Airflow UI 中。

我不知道如何访问这两个任务的 JSON 响应。这似乎很简单,但我无法弄清楚。在 SimpleHtttpOperator 中,我看到了response_check的参数,但没有任何东西可以简单地打印、存储或查看 JSON 响应。

谢谢。

4

2 回答 2

23

因此,由于这是 SimpleHttpOperator,并且实际的 json 被推送到 XCOM,您可以从那里获取它。这是该操作的代码行:https ://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py#L87

您需要做的是 set xcom_push=True,因此您的第一个 t1 将如下所示:

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    xcom_push=True,
    dag=dag)

您应该能够在 XCOM 中找到所有 JSON,return valueXCOM 的更多详细信息可以在以下位置找到:https ://airflow.incubator.apache.org/concepts.html#xcoms

于 2017-10-11T03:36:30.283 回答
9

我主要为尝试(或想要)从流程中调用Airflow 工作流 DAG 并接收DAG 活动产生的任何数据的任何人添加此答案。

重要的是要了解运行 DAG 需要 HTTP POST,并且对该 POST 的响应在 Airflow 中是硬编码的,即,如果不更改 Airflow 代码本身,Airflow 将永远不会返回除了状态代码和消息之外的任何内容给请求过程。

Airflow 似乎主要用于为 ETL(提取、转换、加载)工作流创建数据管道,现有的Airflow Operators,例如 SimpleHttpOperator,可以从 RESTful Web 服务获取数据,对其进行处理,然后使用其他运算符将其写入数据库,但是不要在对运行工作流 DAG 的 HTTP POST 的响应中返回它。

即使操作员确实在响应中返回了这些数据,查看 Airflow 源代码也可以确认 trigger_dag() 方法不会检查或返回它:

apache_airflow_airflow_www_api_experimental_endpoints.py

apache_airflow_airflow_api_client_json_client.py

它返回的只是这条确认消息:

编排服务中收到 Airflow DagRun 消息

由于 Airflow 是开源的,我想我们可以修改 trigger_dag() 方法以返回数据,但是我们将被困在维护分叉的代码库中,并且我们将无法使用云托管的基于 Airflow 的服务,例如Google Cloud Platform 上的 Cloud Composer,因为它不包含我们的修改。

更糟糕的是,Apache Airflow 甚至没有正确返回其硬编码的状态消息。

当我们成功POST到 Airflow/dags/{DAG-ID}/dag_runs端点时,我们会收到“200 OK”响应,而不是我们应该的“201 Created”响应。并且 Airflow 使用其“已创建……”状态消息“硬编码”响应的内容正文。 然而,标准是在响应头中返回新创建资源的 Uri,而不是在正文中……这将使正文可以自由地返回在此创建期间(或由此产生的)生​​成/聚合的任何数据。

我将此缺陷归因于“盲目”(或我称之为“幼稚”)敏捷/MVP 驱动的方法,它只添加了需要的功能,而不是保持了解并为更通用的实用程序留出空间。由于Airflow 主要用于为(和由)数据科学家(而非软件工程师)创建数据管道,因此Airflow 操作员可以使用其专有的内部 XCom 功能相互共享数据,正如@Chengzhi 的有用回答所指出的那样(谢谢!)但在任何情况下都不能将数据返回给请求者启动了 DAG,即 SimpleHttpOperator 可以从第三方 RESTful 服务检索数据,并可以与丰富、聚合和/或转换它的 PythonOperator(通过 XCom)共享该数据。然后,PythonOperator 可以与 PostgresOperator 共享其数据,后者将结果直接存储在数据库中。但是结果永远无法返回到请求完成工作的进程,即我们的编排服务,这使得 Airflow 对任何用例都无用,但由当前用户驱动的用例。

这里的要点(至少对我而言)是 1)永远不要将过多的专业知识归于任何人或任何组织。Apache 是一个重要的组织,在软件开发方面有着深厚而重要的根基……但它们并不完美。并且 2) 始终提防内部的专有解决方案。开放的、基于标准的解决方案已经从许多不同的角度进行了检查和审查,而不仅仅是一个角度。

我失去了将近一周的时间来寻找不同的方法来做一件看似非常简单和合理的事情。我希望这个答案可以为其他人节省一些时间。

于 2019-08-16T22:37:28.263 回答