3

我想触发一个简单的httpoperator,像这样:airflow trigger_dag test_trigger --conf '{"name":"something"}'

然后我使用 pythonoperator python_callable 通过使用 kwargs['dag_run'].conf 来接受参数,我想将 ['dag_run'].conf 传递给 simplehttpoperator,我该怎么做?有人可以帮忙吗?

cc_ = {}


def run_this_func(ds, **kwargs):
    cc_ = kwargs['dag_run'].conf
    logging.info(cc_)
    return cc_

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag)

http_task = SimpleHttpOperator(
    task_id='http_task',
    http_conn_id='test_http',
    method='POST',
    endpoint='/api/v1/function',
    data=cc_,
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"},
    response_check=lambda response: True if "10000" in response.content else False,
    dag=dag)

http_task.set_upstream(run_this)
4

3 回答 3

2

感谢@Chengzhi 和@Daniel。最后,我在 Jinja2/filter.py 中编写了一个自定义过滤器“tojson”,因为在气流中,默认的 Jinja2 版本是 2.8.1,而 Jinja2 直到 2.9 版本才包含名为“tojson”的内置过滤器。

def do_tojson(value):
    value = json.JSONEncoder().encode(value)
    return value

在 dag 文件中,代码如下。有用。

def run_this_func(ds, **kwargs):
    cc_ = kwargs['dag_run'].conf
    return cc_

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag)

http_task = SimpleHttpOperator(
    task_id='http_task',
    http_conn_id='test_http',
    method='POST',
    endpoint='/api/v1/task',
    data="{{ task_instance.xcom_pull(task_ids='run_this') |tojson}}",
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*",
             "Content-Type": "application/json"},
    response_check=lambda response: True if "10000" in response.content else False,
    dag=dag)

http_task.set_upstream(run_this)
于 2017-09-14T07:11:49.680 回答
0

您不需要单独的 PythonOperator 来收集数据。我创建了一个自定义运算符,它可以接受数据(从 API 传递)并将其传递以发送 GET?POST 请求。这是链接:https ://stackoverflow.com/a/69443084/8081381

于 2021-10-04T22:42:59.957 回答
0

对于任务之间的通信,您可能需要查看 XCOM,https: //airflow.incubator.apache.org/concepts.html#xcoms

*****更新*****
(感谢 Daniel 提供更多详细信息)下面是一些您可以尝试的代码,在您的 SimpleHttpOperator 中您可以通过 XCOM 获得返回值:

http_task = SimpleHttpOperator(
    task_id='http_task',
    http_conn_id='test_http',
    method='POST',
    endpoint='/api/v1/function',
    data=json.loads("{{ task_instance.xcom_pull(task_ids='run_this', key='return_value') }}"),
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"},
    response_check=lambda response: True if "10000" in response.content else False,
    dag=dag)
于 2017-09-11T15:40:28.940 回答