You can use BigQueryHook.patch_bq_table(), to update the expiration after you created your table. It accepts expiration_time
as parameter.
NOTE: patch_bq_table() only updates the passed parameters.
expiration_time (Optional[int]) -- [Optional] The time when this table
expires, in milliseconds since the epoch.
See code below:
import datetime
from airflow import models
from airflow.operators import python
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
DATASET_NAME = 'your-dataset'
TABLE_NAME = 'your-table'
PROJECT_ID = 'your-project'
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'create_table_add_expiration',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bigquery',
bucket='bucket-name',
source_objects=['folder/file.csv'],
source_format='CSV',
skip_leading_rows=1,
destination_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}",
autodetect=True,
write_disposition='WRITE_TRUNCATE',
)
def patch_bq_table(**kwargs):
hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
hook.patch_table(
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
project_id=PROJECT_ID,
description="test",
expiration_time=1646884330000 # March 10, 2022 in epoch time (milliseconds)
)
update_table = python.PythonOperator(
task_id='add_expiration',
provide_context=True,
python_callable=patch_bq_table,
)
load_csv >> update_table
Airflow test:
Updated table in BQ: