我用来将Cloud Composer (Airflow)
数据从日常导出。MySQL 表的增量 id 为. 要每天只附加新行,我需要:MySQL
BigQuery
PRIMARY KEY
- 查询 BQ 表找到
max(id) AS bq_max_id
- 将 MySQL 新数据导出
WHERE id > bq_max_id
到 GCS - 将 GCS 数据加载到 BQ
要做第一步,我需要使用 2 个运算符
select_max_id = bigquery_operator.BigQueryOperator(
task_id='select_max_id',
bql="""
SELECT max(id) AS max_id
FROM some_schema.some_table
""",
use_legacy_sql=False,
destination_dataset_table='some_schema.xcom_value_table')
pull_xcom_value = bigquery_get_data.BigQueryGetDataOperator(
task_id='pull_xcom_value',
dataset_id='some_schema',
table_id='xcom_value_table')
然后我用 aBashOperator
来删除 xcom_value_table。
有没有比创建表和使用 3 个运算符更好的方法将 BQ 查询结果作为 XCom 传递?