问题标签 [google-cloud-composer]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
google-cloud-platform - 使用 google-cloud-bigquery python 客户端库的 Google Cloud Composer
我正在尝试在 Google Cloud Composer 中运行 DAG,其中第一个组件是使用 http GET 请求调用 API,然后使用 python-client 库将 json 插入 BigQuery 表中。我正在尝试运行此功能:https ://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.client.Client.insert_rows_json.html
我添加了 PYPI 包:
请求 ===2.19.1
numpy ===1.12.0
谷歌云大查询 ===1.4.0
我收到以下错误: Broken DAG: [/home/airflow/gcs/dags/composer_test_july30_v2.py] 'Client' 对象 在 Airflow UI 控制台中没有属性 'get_table' 。
显示的所有代码都可以在本地工作,但无法使用 Cloud Composer。
google-cloud-composer - 更改作曲家 UI 网址
有办法更改自动生成的网址吗?
https://xxxxxxxxxxxxxxxxx-tp.appspot.com
对于https://custom-name-tp.appspot.com
或类似的东西。
谢谢
airflow - Apache Airflow - 如何在使用 TriggerDagRunOperator 触发的流中检索运算符外部的 dag_run 数据
我设置了两个 DAG,我们称第一个为 orchestrator,第二个为 worker。Orchestrator 的工作是从 API 中检索一个列表,并针对该列表中的每个元素,使用一些参数触发工作 DAG。
我将这两个工作流程分开的原因是我希望能够仅重播失败的“工作”工作流程(如果一个失败,我不想重播所有工作实例)。
我能够让事情正常进行,但现在我看到了监控是多么困难,因为我的 task_id 对所有人都是一样的,所以我决定根据“orchestrator”工作流从 API 检索到的值来使用动态 task_id。
但是,我无法从运算符外部的 dag_run 对象中检索值。基本上,我希望这个工作:
但我无法定义这个“上下文”对象。但是,我可以从运算符(例如 PythonOperator 和 BashOperator)访问它。
是否可以在操作员之外检索 dag_run 对象?
google-cloud-platform - 我可以重新启动 Cloud Composer 环境吗?
我使用Google Cloud
作曲家几天了,主要是为了将数据从 MySQL 移动到 BigQuery,它工作正常。
在某些时候,它停止工作:
运行任务运行很长时间然后失败
任务没有开始
新dags有评论
This DAG seems to be existing only locally. The master scheduler doesn't seem to be aware of its existence
我已经使用 Airflow Web UI 清理了所有 DAG 运行和任务实例,但仍然无法正常工作。
有没有办法在不丢失已完成任务结果的情况下重新启动环境?有没有其他方法可以让 Airflow 手动运行?
google-cloud-composer - 添加与 gdrive 范围的 BigQuery 连接?
我有一个外部表格表,我想通过 Airflow 中的 BigQueryOperator 进行查询。
我更喜欢使用 Cloud Composer 服务帐户。
我通过 Airflow UI 使用以下参数创建了一个新连接:
在我的 DAG 中,我使用:BigQueryOperator(..., bigquery_conn_id='bigquery_with_gdrive_scope')
日志报告:Access Denied: BigQuery BigQuery: No OAuth token with Google Drive scope was found.
任务属性显示:bigquery_conn_id bigquery_with_gdrive_scope
就好像bigquery_conn_id
参数被忽略了一样。
airflow - 如何从气流打包的 DAG 中读取配置文件?
Airflow封装的 DAG似乎是合理的生产气流部署的重要组成部分。
我有一个由配置文件驱动的带有动态 subDAG 的 DAG,例如:
配置.yaml:
这会产生 subdag 任务,例如imports.project_{foo|bar}.step{1|2|3}
.
我通常使用 python 的open
函数读取配置文件,a laconfig = open(os.path.join(os.path.split(__file__)[0], 'config.yaml')
不幸的是,在使用打包的 DAG 时,这会导致错误:
有什么想法/最佳实践可以在这里推荐吗?
google-bigquery - Airflow:将 BigQuery 结果作为 XCom 传递的最佳方式
我用来将Cloud Composer (Airflow)
数据从日常导出。MySQL 表的增量 id 为. 要每天只附加新行,我需要:MySQL
BigQuery
PRIMARY KEY
- 查询 BQ 表找到
max(id) AS bq_max_id
- 将 MySQL 新数据导出
WHERE id > bq_max_id
到 GCS - 将 GCS 数据加载到 BQ
要做第一步,我需要使用 2 个运算符
然后我用 aBashOperator
来删除 xcom_value_table。
有没有比创建表和使用 3 个运算符更好的方法将 BQ 查询结果作为 XCom 传递?
google-cloud-platform - Cloud Composer (Airflow) 作业卡住
由于我取消了一个耗时太长的任务实例(我们称之为任务 A),因此我的托管服务卡了几个Cloud Composer
小时Airflow
我已经清除了所有 DAG 运行和任务实例,但是有几个作业正在运行,一个作业处于 Shutdown 状态(我想是任务 A 的作业)(我的作业的快照)。
此外,调度程序似乎没有运行,因为最近删除的 DAG 一直出现在仪表板中
有没有办法杀死作业或重置调度程序?任何摆脱作曲家的想法都将受到欢迎。
python - 将返回值从运算符传递给气流中的后续运算符
我正在尝试为该source_objects
字段提供一个字符串列表,GoogleCloudStorageToBigQueryOperator
但使用以下代码时出现错误:
字符串索引必须是整数,而不是 unicode
我不知道的事情:
- 如何在 DAG 范围内获得 XCOM的
return
值?get_file_name
- 如何
xcom_pull
在 DAG 范围内调用函数而无需提供上下文?在我看来,任务实例不需要提供上下文。
我想到的事情:
- 重写操作符并将 XCOM 作为参数
我想做的事情:
- 我希望能够打电话给接线员
另外,操作员的某些字段似乎使用了一个名为 的功能templated_field
,模板字段背后的机制是什么?不只是为了PythonOperator
andBashOperator
吗?
最后一个,为什么不PythonOperator
返回一个TaskInstance
?
我对 Python 和 Airflow 有点陌生。我猜这些事情应该是微不足道的。我确定我在这里误解了一些东西。
mysql - 不能从谷歌云作曲家做 Mysql 转储
我正在尝试创建一个简单的 dag,将 mysql 转储到这样的存储桶中:
我创建了一个 MySQL 连接mysql_instance_connection
,使用主机的 IP 地址、架构的数据库名称、在线创建数据库时设置的登录名和密码以及端口为 3306。
当我运行 dag 时,出现以下错误:
_mysql_exceptions.OperationalError: (2003, "Can't connect to MySQL server on '[IP_ADDRESS]' (110)")@-@{"task-id": "extract_data", "execution-date": "2018-08-21T16:35:59.161183", "workflow": "extractMysqlData"}
我需要做什么才能授予作曲家访问我的 MySQL 数据库的权限?