0

气流达格:

def print_hello(**kwargs):
    task_params = kwargs['dag_run'].conf['task_payload']
    print('Hello world a with {}'.format(task_params))


def hello_world():
    args = {
        "start_date": datetime(2022, 5, 1),
        "retries": 1,
        "sla": timedelta(hours=3)
    }

    dag_name = 'hello_world_a'
    dag = dag_utils.create_dag(dag_name, args=args, schedule_interval=None)


    test = PythonOperator(
        task_id='hello_world_printer',
        python_callable=print_hello,
        provide_context=True,
        dag=dag)

    with_done_check((test),
                    date_format="{{ ds }}",
                    run_mode=get_mode())

    return dag

Api curl --location --request POST 'http://loclahost:8080/api/experimental/dags/hello_world_a/dag_runs'
--header 'Content-Type: application/json'
--header 'Cache-Control: no-缓存'
--data-raw '{"conf":"{"mesage":"test"}"}'

请让我知道为什么我的 dag 处于运行状态,但没有被执行

4

1 回答 1

0

尝试在 中切换月份和日期值start_datedatetime.datetime()是年/月/日格式。Airflow 很可能要等到 5 月 1 日。

于 2022-01-06T20:54:02.473 回答