2

我正在尝试按以下方式在 AWS Airflow Managed Service 的 DAG 中使用 PostgresHook:

from airflow.hooks.postgres_hook import PostgresHook

使用此服务的 Airflow 版本是 1.10.12,但是当我上传此 DAG 时,Airflow UI 向我显示“Broken DAG: No module named 'psycopg2'”错误。

我有使用这些模块定义的 requirements.txt 文件,但似乎没有一个工作:

psycopg2-binary
psycopg2
tableauserverclient
google-auth
botocore
apache-airflow[postgres]

有谁知道是否有解决此问题的方法?AWS 论坛页面中没有太多关于此的信息。

4

1 回答 1

2

我在 MWAA 中使用 psycopg2 没有问题。

在我的要求中,我只有 psycopg2-binary 而不是 psycopg2。

这是我喜欢用来列出安装在我的 MWAA 气流环境中的所有 pip 包的 dag:

import os
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

DAG_ID = os.path.basename(__file__).replace('.py', '')

DEFAULT_ARGS = {
    'owner': 'Louis',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False
}

with DAG(
        dag_id=DAG_ID,
        default_args=DEFAULT_ARGS,
        description='Print all installed Python packages',
        dagrun_timeout=timedelta(hours=2),
        start_date=days_ago(1),
        schedule_interval=None,
        tags=['bash']
) as dag:
    list_python_packages_operator = BashOperator(
        task_id='list_python_packages',
        bash_command='python3 -m pip list'
    )

list_python_packages_operator

希望它有助于调试您的问题。

于 2021-02-16T14:23:09.257 回答