0

所以我的目标是创建一个带有 BigQueryOperators 的 Dag,我可以在我的 SQL 中使用参数化的目标表在 Airflow 中发送它。我检查了很多关于如何向 PythonOperators 发送参数以便在 Airflow 中使用 --conf 调用它们的主题,但我不知道如何将相同的方式应用于 BigQueryOperators 的参数。

我的 dag.py 看起来像这样:


import airflow
import blabla..
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

with DAG(
    "TestPython",
    schedule_interval=None,
    default_args=default_args,
    max_active_runs=1,
    catchup=False,
) as dag:


    stepOne = BigQueryOperator(
        task_id="stepOne",
        sql="SELECT * FROM `testTable` ",
        destination_dataset_table=" **variableTable** ",
        write_disposition="WRITE_TRUNCATE",
        use_legacy_sql=False,
    )

    stepOne

我想知道是否有办法使用气流 trigger_dag 命令或其他东西设置目标表名称(当然,当它没有设置时有一个默认值,所以它仍然可以上传到我的 Dag 存储桶中)

如果有什么不清楚的地方,我可以提供更多的细节和我尝试做的方法。

4

1 回答 1

0

是的,您可以将运行时值传递给“destination_dataset_table”,因为它是一个模板化字段。

例如:

my_suffix = "{{ macros.ds_format(macros.ds_add(ds, -2), "%Y-%m-%d", "%Y%m%d") }}"
stepOne = BigQueryOperator(
    task_id="stepOne",
    sql="SELECT * FROM `testTable` ",
    destination_dataset_table=f"project_id.dataset_id.table_prefix_{my_suffix}",
    write_disposition="WRITE_TRUNCATE",
    use_legacy_sql=False,
)

在我的示例中,我使用 Airflow 宏来更改表名来操作日期,但您可以使用许多其他方法,例如 XCOM:

"{{ task_instance.xcom_pull(task_ids='task_id', key='return_value') }}"

对于您的特定用例,我认为这个答案应该有效。

您可以使用 --conf '{"key":"value"}' 从 CLI 传递参数,然后在 DAG 文件中将其用作模板字段中的 "{{ dag_run.conf["key"] }}"。

于 2021-03-21T13:50:15.500 回答