问题标签 [google-cloud-composer]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1408 浏览

google-cloud-platform - 将现有 Airflow DB 迁移到 Cloud Composer

有没有办法将现有的 Airflow 实例迁移到 Google Cloud Composer?

我们目前正在使用数据库的 postgres 运行我们自己的 Airflow 实例。理想情况下,我们能够保留 DAG 的现有历史,我认为这需要复制整个数据库。这可能吗?

0 投票
1 回答
840 浏览

airflow - 从另一个项目访问存储桶?

我在 VM 中有一个脚本,可以将数据写入另一个项目的存储桶中。

我想用 Airflow 安排这个脚本,但是当脚本需要写入数据时,我遇到了 IAM 访问问题:

AccessDeniedException: 403 148758369895-compute@developer.gserviceaccount.com 没有 storage.objects.list 访问 ******

要启动脚本,我使用以下命令:

如果我想使用 Google Cloud Shell 启动脚本,我需要使用gcloud auth login但如何使用 Airflow/Composer 来执行此操作?

我试过了

没有成功

0 投票
2 回答
11451 浏览

python - 气流,标记任务成功或在 dag 运行之前跳过它

我们有一个巨大的 DAG,有许多小而快的任务和一些大而耗时的任务。

我们只想运行 DAG 的一部分,而我们发现的最简单的方法是不添加我们不想运行的任务。问题是我们的 DAG 有很多共同依赖,所以当我们想要跳过一些任务时,不破坏 dag 成为一个真正的挑战。

有没有办法默认为任务添加状态?(对于每次运行),例如:

或者

0 投票
3 回答
1209 浏览

python - Airflow/Composer - 在 zip 打包的 DAG 中找不到模板

我无法让模板化的 SQL 文件在 Composer 中工作。我认为问题与我将 DAG 打包为 zip 文件以包含其他代码这一事实有关。

我从这个开始(只显示相关部分):

文件结构如下所示:

我像这样压缩它并将其复制到 Composer dags 文件夹:

这在本地工作,但在 Composer 上部署时出错:

我确定我在 zip 中包含了 SQL 文件。我也尝试将它移动到根文件夹(没有 sql/ 子目录),但我得到了相同的结果。

template_searchpath我在某个地方读到了在实例化 DAG 对象时需要设置的地方。我无法成功地做到这一点。当我尝试相对路径 ( sql) 时,会出现更多TemplateNotFound错误。当我尝试如下绝对路径时,我得到not a directory.

这是我尝试过的:

我还尝试将“sql”作为任务路径的一部分而不是模板搜索路径,然后我再次尝试将所有内容移动到根级别,并得到相同的“不是目录”错误。

据我所知,问题与文件包含在 zip 中的事实有关。 __file__返回/home/airflow/gcs/dags/my_zip_file.zip/my_dag_file.py。但随后os.listdir(os.path.dirname(__file__))抛出相同的not a directory错误。所以可能是因为我们在 zip 存档中执行,我们不能以相同的方式使用文件夹和路径。也许Jinja被这件事绊倒了……?或者在打包 zip 文件时可能还有更多工作要做?

0 投票
1 回答
272 浏览

google-cloud-composer - 将外部工作人员连接到 Cloud Composer 气流

是否可以连接不属于 Cloud Composer Kubernetes 集群的外部工作人员?用例是将非云数据中心中的盒子连接到 Composer 集群。

0 投票
2 回答
1972 浏览

google-cloud-composer - “这个 DAG 似乎只存在于本地。主调度器似乎没有意识到它的存在。”

我开始尝试使用 Google Cloud Composer,在其中部署了一些 DAG:

在此处输入图像描述

我的一个 DAG 带有一条 info 语句,指示This DAG seems to be existing only locally. The master scheduler doesn't seem to be aware of its existence.无法运行,即使手动运行也是如此。当我手动启动它时,它会永远处于“运行”状态,并且永远不会开始运行第一个任务。

正如下面详细解释的那样,两个 DAG 之间的唯一区别是损坏的 DAG 使用的是自定义运算符。

您知道这里出了什么问题以及我该如何解决吗?

谢谢

  1. hello2_gcp_plugins_v2正在调用唯一的 bash 和电子邮件操作员按预期工作(我收到了电子邮件)。如果我配置一个 scheduler_interval 它会按预期启动。即使我将调度程序间隔设置为无,当我手动启动它时它运行良好
  2. hello2_gcp_plugins_v5正在调用我已经部署在预期存储桶中的自定义运算符。自定义操作符只是通过 HttpHook 调用 API 来获取数据并通过 GoogleCloudStorageHook 将其上传到 gcs 存储桶。无论调度程序间隔设置或保持为无,我总是在 UI 中看到 info 语句,并且 DAG 永远不会自动启动。手动启动时,它永远保持运行状态,并且永远不会触发第一个任务。
0 投票
1 回答
254 浏览

google-cloud-composer - PythonOperator 任务挂起访问 Cloud Storage 并按 SCHEDULED 堆叠

访问 Cloud Storage 时,我的 DAG 中的一项任务有时会挂起。似乎代码在download此处的函数处停止:

hook = GoogleCloudStorageHook(google_cloud_storage_conn_id='google_cloud_default') for input_file in hook.list(bucket, prefix=folder): hook.download(bucket=bucket, object=input_file)

在我的测试中,该文件夹包含一个 20Mb 的 json 文件。

该任务通常需要 20-30 秒,但在某些情况下它会运行 5 分钟,然后其状态会更新SCHEDULED并停留在那里(等待超过 6 小时)。我怀疑 5 分钟是由于配置原因,scheduler_zombie_task_threshold 300但不确定。

如果我在 Web UI 上手动清除任务,该任务会快速排队并再次正确运行。我通过设置execution_timeout正确更新任务FAILEDUP_FOR_RETRY状态超过 10 分钟来解决这个问题;但我想解决根本问题以避免依赖固定的超时阈值,有什么建议吗?

0 投票
1 回答
2774 浏览

airflow - 气流调度程序不断崩溃,数据库连接错误(谷歌作曲家)

我使用 Google Composer 已经有一段时间了(composer-0.5.2-airflow-1.9.0),并且在使用 Airflow 调度程序时遇到了一些问题。调度程序容器有时会崩溃,它可能会陷入无法启动任何新任务(数据库连接错误)的锁定状态,因此我必须重新创建整个 Composer 环境。这一次,有一个CrashLoopBackOff并且调度程序 pod 无法再重新启动。该错误与我之前遇到的错误非常相似。这是来自 Stackdriver 的回溯:

我对技术 RDBMS 错误一无所知。但是,这是一个具有默认环境的开箱即用的 Google Composer,所以我想知道是否有其他人遇到过类似的问题或知道发生了什么?我了解 Composer 使用 Google Cloud SQL 作为数据库,显然(?)MySQL 后端。

气流调度程序图像是gcr.io/cloud-airflow-releaser/airflow-worker-scheduler-1.9.0:cloud_composer_service_2018-06-19-RC3.

我必须补充一点,我在自制的 Airflow Kubernetes 设置中没有遇到这个调度程序问题,但后来我使用的是带有 PostgreSQL 的最先进的 Airflow 版本。

0 投票
2 回答
1973 浏览

airflow - 调度程序不更新包文件

我正在 Cloud Composer 上开发 DAG;我的代码被分成一个主 python 文件和一个带有子文件夹的包,它看起来像这样:


我更新了其中一个函数package1/functions.py以获取附加参数(并更新 中的引用my_dag1.py)。该代码将在我的本地环境中正确运行,并且在运行时我没有收到任何错误

但是 Web UI 引发了 python 错误

TypeError:my_function() 得到了一个意外的关键字参数“new_argument”


  • 我试图重命名函数并将错误更改为 NameError: name 'my_function' is not defined

  • 我尝试更改 DAG 的名称并将文件上传到压缩和解压缩的 dag 文件夹,但没有任何效果。

只有在我重命名包文件夹后,错误才消失。


我怀疑这个问题与调度程序有关,my_dag1.py但不是package1/functions.py。该错误突然出现,因为我在前几周进行了类似的更新。

关于如何在不重构整个代码结构的情况下解决此问题的任何想法?


编辑-1

这是Google Groups 上相关讨论的链接

0 投票
1 回答
193 浏览

google-cloud-platform - 调整节点配置中断 sqlproxy 和 scheduler

今天我尝试更改支持 Cloud Composer 环境的集群的节点类型并切换到 Ubuntu 映像而不是 COS。我这样做是通过在 GKE 集群中添加第二个节点池,然后删除第一个并迁移所有工作负载.

这会在气流-sqlproxy 日志中产生以下错误:

调度程序无法完全启动并发出以下堆栈跟踪:

似乎与支持 SQL 数据库的连接现在已断开。它仍然是同一个集群,但节点不同。我必须更新任何其他配置吗?