1

我刚刚设置了 AWS MWAA(托管气流),并且正在玩在 dag 中运行一个简单的 bash 脚本。我正在阅读该任务的日志并注意到默认情况下,该任务会查找aws_default连接并尝试使用它但没有找到它。

我去了连接窗格并设置了aws_default连接,但它仍然在日志中显示相同的消息。

气流连接:aws_conn_id=aws_default

未从 Connection 检索到凭据

*** Reading remote log from Cloudwatch log_group: airflow-mwaa-Task log_stream: dms-

postgres-dialog-label-pg/start-replication-task/2021-11-22T13_00_00+00_00/1.log.
[2021-11-23 13:01:02,487] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-11-23 13:01:02,657] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,656] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-11-23 13:01:02,678] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,678] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=us-east-1
[2021-11-23 13:01:02,772] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,772] {{base_aws.py:157}} INFO - role_arn is None

如何让 MWAA 识别此连接?

我的天:

from datetime import datetime, timedelta, tzinfo
import pendulum

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

local_tz = pendulum.timezone("America/New_York")
start_date = datetime(2021, 11, 9, 8, tzinfo=local_tz)
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'dms-postgres-dialog-label-pg-test',
    default_args=default_args,
    description='',
    schedule_interval=timedelta(days=1),
    start_date=start_date,
    tags=['example'],
) as dag:

    t1 = BashOperator(
        task_id='start-replication-task',
        bash_command="""
        aws dms start-replication-task --replication-task-arn arn:aws:dms:us-east-1:blah --start-replication-task-type reload-target
        """,
    )

    t1

编辑:现在,我只是导入一个内置函数并使用它来获取凭据。例子:

from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('aws_service_account')
...

print(conn.host)
print(conn.login)
print(conn.password)
4

1 回答 1

1

在我刚开始使用 AWS 支持时更新此内容。

使用 MWAA 创建的执行角色,而不是 aws_default 中的访问密钥 ID 和机密。要使用自定义访问密钥 id 和秘密,请按照@Jonathan Porter的建议和他的问题回答:

from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('aws_service_account')
...

print(conn.host)
print(conn.login)
print(conn.password)

但是,如果要使用 mwaa 提供的特定执行角色,这是 mwaa 中的默认设置。令人困惑的是,信息消息表明没有从连接中检索到凭据,但是执行角色将用于类似于 kubernetes pod operator 的东西

[2021-11-23 13:01:02,487] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,486] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-11-23 13:01:02,657] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,656] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-11-23 13:01:02,678] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,678] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=us-east-1
[2021-11-23 13:01:02,772] {{logging_mixin.py:104}} INFO - [2021-11-23 13:01:02,772] {{base_aws.py:157}} INFO - role_arn is None

例如,以下内容自动使用 mwaa 环境中的执行角色设置的 .aws/credentials :

from datetime import timedelta
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
    'owner': 'aws',
    'depends_on_past': False,
    'start_date': datetime(2019, 2, 20),
    'provide_context': True
}

dag = DAG(
    'kubernetes_pod_example', default_args=default_args, schedule_interval=None
)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
    namespace="mwaa",
    image="ubuntu:18.04",
    cmds=["bash"],
    arguments=["-c", "ls"],
    labels={"foo": "bar"},
    name="mwaa-pod-test",
    task_id="pod-task",
    get_logs=True,
    dag=dag,
    is_delete_operator_pod=False,
    config_file=kube_config_path,
    in_cluster=False,
    cluster_context='aws',
    execution_timeout=timedelta(seconds=60)
)

希望这对其他跌跌撞撞的人有所帮助。

于 2022-01-27T17:52:26.397 回答