1

我正在考虑将 Airflow 从 ECS 转移到 MWAA,这显然效果很好。但从 CICD 的角度来看,加载连接有一些限制。根据文档,我们无法根据他们在此处所说的内容使用命令行设置连接:https ://docs.aws.amazon.com/mwaa/latest/userguide/access-airflow-ui.html#call-mwaa-apis-cli

我知道我们可以使用 UI 来做到这一点,但这不是本意。有谁知道如何处理这个问题,或者是否有任何方法可以自动处理?

非常感谢哈维

4

1 回答 1

0

您可以在 DAG 中添加一个步骤,如果环境中尚不存在连接,它将以编程方式添加连接:

def add_connection_callable(**kwargs):
    connection: Connection = Connection(
        conn_id="foo",
        conn_type="HTTP",
        host=kwargs['host'],
        port=kwargs['port']
    )
    session: Session = settings.Session
    db_connection: Connection = session.query(Connection) \
        .filter(Connection.conn_id == "foo") \
        .first()
    if db_connection is None:
        logging.info("Adding connection \"foo\"..")
        session.add(connection)
        session.commit()
    else:
        logging.info("Connection \"foo\" already exists.")

add_connection: PythonOperator = PythonOperator(
    task_id="add_connection",
    python_callable=add_connection_callable,
    op_kwargs={
        "host": "{{ dag_run.conf['Host'] }}",
        "port": "{{ dag_run.conf['Port'] }}"
    },
    dag=dag)
于 2021-03-07T00:33:49.213 回答