问题标签 [google-cloud-composer]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1260 浏览

google-cloud-platform - 使用 google-cloud-bigquery python 客户端库的 Google Cloud Composer

我正在尝试在 Google Cloud Composer 中运行 DAG,其中第一个组件是使用 http GET 请求调用 API,然后使用 python-client 库将 json 插入 BigQuery 表中。我正在尝试运行此功能:https ://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.client.Client.insert_rows_json.html

我添加了 PYPI 包:

请求 ===2.19.1

numpy ===1.12.0

谷歌云大查询 ===1.4.0

我收到以下错误: Broken DAG: [/home/airflow/gcs/dags/composer_test_july30_v2.py] 'Client' 对象 在 Airflow UI 控制台中没有属性 'get_table' 。

显示的所有代码都可以在本地工作,但无法使用 Cloud Composer。

0 投票
1 回答
167 浏览

google-cloud-composer - 更改作曲家 UI 网址

有办法更改自动生成的网址吗?

https://xxxxxxxxxxxxxxxxx-tp.appspot.com对于https://custom-name-tp.appspot.com或类似的东西。

谢谢

0 投票
2 回答
3630 浏览

airflow - Apache Airflow - 如何在使用 TriggerDagRunOperator 触发的流中检索运算符外部的 dag_run 数据

我设置了两个 DAG,我们称第一个为 orchestrator,第二个为 worker。Orchestrator 的工作是从 API 中检索一个列表,并针对该列表中的每个元素,使用一些参数触发工作 DAG。

我将这两个工作流程分开的原因是我希望能够仅重播失败的“工作”工作流程(如果一个失败,我不想重播所有工作实例)。

我能够让事情正常进行,但现在我看到了监控是多么困难,因为我的 task_id 对所有人都是一样的,所以我决定根据“orchestrator”工作流从 API 检索到的值来使用动态 task_id。

但是,我无法从运算符外部的 dag_run 对象中检索值。基本上,我希望这个工作:

但我无法定义这个“上下文”对象。但是,我可以从运算符(例如 PythonOperator 和 BashOperator)访问它。

是否可以在操作员之外检索 dag_run 对象?

0 投票
2 回答
2655 浏览

google-cloud-platform - 我可以重新启动 Cloud Composer 环境吗?

我使用Google Cloud作曲家几天了,主要是为了将数据从 MySQL 移动到 BigQuery,它工作正常。

在某些时候,它停止工作:

  • 运行任务运行很长时间然后失败

  • 任务没有开始

  • 新dags有评论This DAG seems to be existing only locally. The master scheduler doesn't seem to be aware of its existence

我已经使用 Airflow Web UI 清理了所有 DAG 运行和任务实例,但仍然无法正常工作。

有没有办法在不丢失已完成任务结果的情况下重新启动环境?有没有其他方法可以让 Airflow 手动运行?

0 投票
2 回答
1788 浏览

google-cloud-composer - 添加与 gdrive 范围的 BigQuery 连接?

我有一个外部表格表,我想通过 Airflow 中的 BigQueryOperator 进行查询。

我更喜欢使用 Cloud Composer 服务帐户。

我通过 Airflow UI 使用以下参数创建了一个新连接:

在我的 DAG 中,我使用:BigQueryOperator(..., bigquery_conn_id='bigquery_with_gdrive_scope')

日志报告:Access Denied: BigQuery BigQuery: No OAuth token with Google Drive scope was found.

任务属性显示:bigquery_conn_id bigquery_with_gdrive_scope

就好像bigquery_conn_id参数被忽略了一样。

0 投票
1 回答
2657 浏览

airflow - 如何从气流打包的 DAG 中读取配置文件?

Airflow封装的 DAG似乎是合理的生产气流部署的重要组成部分。

我有一个由配置文件驱动的带有动态 subDAG 的 DAG,例如:

配置.yaml:

这会产生 subdag 任务,例如imports.project_{foo|bar}.step{1|2|3}.

我通常使用 python 的open函数读取配置文件,a laconfig = open(os.path.join(os.path.split(__file__)[0], 'config.yaml')

不幸的是,在使用打包的 DAG 时,这会导致错误:

有什么想法/最佳实践可以在这里推荐吗?

0 投票
0 回答
1021 浏览

google-bigquery - Airflow:将 BigQuery 结果作为 XCom 传递的最佳方式

我用来将Cloud Composer (Airflow)数据从日常导出。MySQL 表的增量 id 为. 要每天只附加新行,我需要:MySQLBigQueryPRIMARY KEY

  1. 查询 BQ 表找到max(id) AS bq_max_id
  2. 将 MySQL 新数据导出WHERE id > bq_max_id到 GCS
  3. 将 GCS 数据加载到 BQ

要做第一步,我需要使用 2 个运算符

然后我用 aBashOperator来删除 xcom_value_table。

有没有比创建表和使用 3 个运算符更好的方法将 BQ 查询结果作为 XCom 传递?

0 投票
2 回答
6572 浏览

google-cloud-platform - Cloud Composer (Airflow) 作业卡住

由于我取消了一个耗时太长的任务实例(我们称之为任务 A),因此我的托管服务卡了几个Cloud Composer小时Airflow

我已经清除了所有 DAG 运行和任务实例,但是有几个作业正在运行,一个作业处于 Shutdown 状态(我想是任务 A 的作业)(我的作业的快照)。

此外,调度程序似乎没有运行,因为最近删除的 DAG 一直出现在仪表板中

有没有办法杀死作业或重置调度程序?任何摆脱作曲家的想法都将受到欢迎。

0 投票
1 回答
7835 浏览

python - 将返回值从运算符传递给气流中的后续运算符

我正在尝试为该source_objects字段提供一个字符串列表,GoogleCloudStorageToBigQueryOperator但使用以下代码时出现错误:

字符串索引必须是整数,而不是 unicode

我不知道的事情:

  • 如何在 DAG 范围内获得 XCOM的return值?get_file_name
  • 如何xcom_pull在 DAG 范围内调用函数而无需提供上下文?在我看来,任务实例不需要提供上下文。

我想到的事情:

  • 重写操作符并将 XCOM 作为参数

我想做的事情:

  • 我希望能够打电话给接线员

另外,操作员的某些字段似乎使用了一个名为 的功能templated_field,模板字段背后的机制是什么?不只是为了PythonOperatorandBashOperator吗?

最后一个,为什么不PythonOperator返回一个TaskInstance

我对 Python 和 Airflow 有点陌生。我猜这些事情应该是微不足道的。我确定我在这里误解了一些东西。

0 投票
0 回答
446 浏览

mysql - 不能从谷歌云作曲家做 Mysql 转储

我正在尝试创建一个简单的 dag,将 mysql 转储到这样的存储桶中:

我创建了一个 MySQL 连接mysql_instance_connection,使用主机的 IP 地址、架构的数据库名称、在线创建数据库时设置的登录名和密码以及端口为 3306。

当我运行 dag 时,出现以下错误:

_mysql_exceptions.OperationalError: (2003, "Can't connect to MySQL server on '[IP_ADDRESS]' (110)")@-@{"task-id": "extract_data", "execution-date": "2018-08-21T16:35:59.161183", "workflow": "extractMysqlData"}

我需要做什么才能授予作曲家访问我的 MySQL 数据库的权限?