0

我正在使用GCSToBigQueryOperatorAirflow 中的任务将 CSV 复制到新的 BQ 表中。有没有办法在这个任务中向这个表添加一个表过期?

new_table_task = GCSToBigQueryOperator(
    task_id='insert_gcs_to_bq_tmp_table',
    bucket=BUCKET,
    source_objects=SOURCE_PATH,
    destination_project_dataset_table=f"{BQ_PROJECT}.{DATASET}.{tmp_table_name}", 
    write_disposition='WRITE_TRUNCATE',
    skip_leading_rows=1,
    schema_object=SCHEMA_OBJECT
)

如果这是不可能的,我最好的选择是先创建表并使用 DDL 定义过期时间,然后使用GCSToBigQueryOperator?谢谢!

4

2 回答 2

1

You can use BigQueryHook.patch_bq_table(), to update the expiration after you created your table. It accepts expiration_time as parameter.

NOTE: patch_bq_table() only updates the passed parameters.

expiration_time (Optional[int]) -- [Optional] The time when this table expires, in milliseconds since the epoch.

See code below:

import datetime

from airflow import models
from airflow.operators import python

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
DATASET_NAME = 'your-dataset'
TABLE_NAME = 'your-table'
PROJECT_ID = 'your-project'

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'create_table_add_expiration',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    load_csv = GCSToBigQueryOperator(
        task_id='gcs_to_bigquery',
        bucket='bucket-name',
        source_objects=['folder/file.csv'],
        source_format='CSV',
        skip_leading_rows=1,
        destination_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}",
        autodetect=True,
        write_disposition='WRITE_TRUNCATE',
    )

    def patch_bq_table(**kwargs):
        hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
        hook.patch_table(
                dataset_id=DATASET_NAME,
                table_id=TABLE_NAME,
                project_id=PROJECT_ID,
                description="test",
                expiration_time=1646884330000 # March 10, 2022 in epoch time (milliseconds)
                )

    update_table = python.PythonOperator(
        task_id='add_expiration',
        provide_context=True,
        python_callable=patch_bq_table,
    )

    load_csv >> update_table

Airflow test:

enter image description here

Updated table in BQ:

enter image description here

于 2022-03-01T04:04:49.113 回答
0

在 GCSToBigQueryOperator 可能解决问题之前使用表过期时间执行BigQueryCreateEmptyTableOperator

气流 BigQueryCreateEmptyTableOperator 文档

BigQueryCreateEmptyTableOperator(
  ...
  table_resource={
            "tableReference": {"tableId": ""},
            "expirationTime": ,
  }
)

于 2022-03-01T03:38:46.507 回答