我想触发一个简单的httpoperator,像这样:airflow trigger_dag test_trigger --conf '{"name":"something"}'
然后我使用 pythonoperator python_callable 通过使用 kwargs['dag_run'].conf 来接受参数,我想将 ['dag_run'].conf 传递给 simplehttpoperator,我该怎么做?有人可以帮忙吗?
cc_ = {}
def run_this_func(ds, **kwargs):
cc_ = kwargs['dag_run'].conf
logging.info(cc_)
return cc_
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag)
http_task = SimpleHttpOperator(
task_id='http_task',
http_conn_id='test_http',
method='POST',
endpoint='/api/v1/function',
data=cc_,
headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"},
response_check=lambda response: True if "10000" in response.content else False,
dag=dag)
http_task.set_upstream(run_this)