气流达格:
def print_hello(**kwargs):
task_params = kwargs['dag_run'].conf['task_payload']
print('Hello world a with {}'.format(task_params))
def hello_world():
args = {
"start_date": datetime(2022, 5, 1),
"retries": 1,
"sla": timedelta(hours=3)
}
dag_name = 'hello_world_a'
dag = dag_utils.create_dag(dag_name, args=args, schedule_interval=None)
test = PythonOperator(
task_id='hello_world_printer',
python_callable=print_hello,
provide_context=True,
dag=dag)
with_done_check((test),
date_format="{{ ds }}",
run_mode=get_mode())
return dag
Api curl --location --request POST 'http://loclahost:8080/api/experimental/dags/hello_world_a/dag_runs'
--header 'Content-Type: application/json'
--header 'Cache-Control: no-缓存'
--data-raw '{"conf":"{"mesage":"test"}"}'
请让我知道为什么我的 dag 处于运行状态,但没有被执行