2

我用来将Cloud Composer (Airflow)数据从日常导出。MySQL 表的增量 id 为. 要每天只附加新行,我需要:MySQLBigQueryPRIMARY KEY

  1. 查询 BQ 表找到max(id) AS bq_max_id
  2. 将 MySQL 新数据导出WHERE id > bq_max_id到 GCS
  3. 将 GCS 数据加载到 BQ

要做第一步,我需要使用 2 个运算符

  select_max_id = bigquery_operator.BigQueryOperator(
        task_id='select_max_id',
        bql="""
            SELECT max(id) AS max_id
            FROM some_schema.some_table
        """,
        use_legacy_sql=False,
        destination_dataset_table='some_schema.xcom_value_table')

  pull_xcom_value = bigquery_get_data.BigQueryGetDataOperator(
        task_id='pull_xcom_value',
        dataset_id='some_schema',
        table_id='xcom_value_table')

然后我用 aBashOperator来删除 xcom_value_table。

有没有比创建表和使用 3 个运算符更好的方法将 BQ 查询结果作为 XCom 传递?

4

0 回答 0