1

我正在 Cloud Composer 上开发 DAG;我的代码被分成一个主 python 文件和一个带有子文件夹的包,它看起来像这样:

my_dag1.py
package1/__init__.py
package1/functions.py
package1/package2/__init__.py
package1/package2/more_functions.py

我更新了其中一个函数package1/functions.py以获取附加参数(并更新 中的引用my_dag1.py)。该代码将在我的本地环境中正确运行,并且在运行时我没有收到任何错误

gcloud beta composer environments run my-airflow-environment list_dags --location europe-west1

但是 Web UI 引发了 python 错误

TypeError:my_function() 得到了一个意外的关键字参数“new_argument”


  • 我试图重命名函数并将错误更改为 NameError: name 'my_function' is not defined

  • 我尝试更改 DAG 的名称并将文件上传到压缩和解压缩的 dag 文件夹,但没有任何效果。

只有在我重命名包文件夹后,错误才消失。


我怀疑这个问题与调度程序有关,my_dag1.py但不是package1/functions.py。该错误突然出现,因为我在前几周进行了类似的更新。

关于如何在不重构整个代码结构的情况下解决此问题的任何想法?


编辑-1

这是Google Groups 上相关讨论的链接

4

2 回答 2

1

我遇到了类似的问题。“Broken DAG”错误不会在 Web UI 中消失。我猜这是 AirFlow 的 Web 服务器中的缓存错误。

  • 背景。

    1. 我创建了一个具有Airflow 插件功能的自定义操作符。
    2. 导入自定义算子后,airflow Web UI一直Broken DAG提示找不到自定义算子。
  • 为什么我认为这是 Web 服务器 Airflow 中的错误?

    1. 我可以使用 command 手动运行 DAG airflow test,所以导入应该是正确的。
    2. 即使我从/dags/气流文件夹中删除了相关的 DAG 文件,错误仍然存​​在。
  • 以下是我为解决此问题所做的工作。

    1. 重新启动气流网络服务。(有时您只能通过此方法解决问题)。
    2. 确保没有 DAG 正在运行,重新启动气流调度程序服务。
    3. 确保没有 DAG 正在运行,重新启动气流工作者

希望它可以帮助遇到同样问题的人。

于 2019-01-14T23:57:11.457 回答
0

尝试使用以下命令重新启动网络服务器:

gcloud beta composer environments restart-web-server ENVIRONMENT_NAME --location=LOCATION

于 2021-07-01T13:46:54.983 回答