3

Airflow 允许您将 dags 依赖的依赖项(外部 python 代码到 dag 代码)放在 dag 文件夹中。这意味着这些外部 python 代码中的任何组件/成员或类都可用于 dag 代码。

但是,在执行此操作时(在云编写环境的 GCS dag 文件夹中),依赖项的组件对 dag 不可用。Airflow Web UI 中显示类似于以下内容的错误:Broken DAG: [/home/airflow/gcs/dags/....py] No module named tester. 其中 tester 是 dags 文件夹中的一个单独的 python 文件。

当使用 Google 的 SDK(运行实际的 Airflow 命令)测试这些任务时,这些任务运行良好,但似乎在 Kubernetes 的某个地方创建了这些容器作业,它似乎也没有接管依赖项。

我意识到 Cloud Compose 处于测试阶段,但我想知道我是否做错了什么。

4

4 回答 4

5

您应该将模块放在包含 __init__.py 文件的单独文件夹中(Airflow 不喜欢其顶级 DAGs 目录中的 __init__.py 文件)。

例如,如果您具有以下目录结构:

dags/
    my_dag.py
    my_deps/
        __init__.py
        dep_a.py
        dep_b.py

你可以写from my_deps import dep_a, dep_b进去my_dag.py

于 2018-05-30T19:13:02.163 回答
2

我遇到了同样的问题,并在邮件列表中得到了帮助。如需参考,请参阅此处的主题:https ://groups.google.com/forum/#!topic/cloud-composer-discuss/wTI7Pbwc6ZY 。有一个方便的 Github Gist 链接,其中也有一些评论。

为了将您自己的依赖项写入和导入到 DAG 中,您需要压缩 dag 及其依赖项,如下所述:https ://airflow.apache.org/concepts.html?highlight=zip#packaged-dags 。

您可以将该 zip 文件直接上传到您的 Cloud Composer GCS 存储桶,Airflow 会提取它。

确保您的依赖项是位于目录顶层的包,而不是模块dags

from foo_dep.foo_dep import my_utility_function将在这里工作:

foo_dag.py
foo_dep/__init__.py
foo_dep/foo_dep.py

from foo_dep import my_utility_function似乎它应该与以下 dags 目录结构一起使用(并且将在本地工作),但它不适用于 Airflow

foo_dag.py
foo_dep.py
于 2018-06-18T22:02:22.990 回答
2

您是否正在寻找如何安装 Python 依赖项?https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies

此外,位于 GCS 存储桶中的 DAGs 文件夹(gcloud beta composer environments describe [environment]获取此存储桶;gs://{composer-bucket}/dags)应映射到您的 pod 中的 /home/airflow/gcs/dags。您是否尝试过通过 SSH 连接到节点来找到它?

于 2018-05-17T19:41:03.350 回答
0

来自配置 Airflow 的官方文档:

第一次运行 Airflow 时,它会在 $AIRFLOW_HOME 目录(默认为 ~/airflow)中创建一个名为 airflow.cfg 的文件。此文件包含 Airflow 的配置,您可以对其进行编辑以更改任何设置

在这个文件中的第一个设置中

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /home/airflow/gcs/dags

气流的基本路径。

于 2018-05-17T09:30:52.997 回答