5

BigQueryOperator在 Google Cloud Composer 上的 Airflow DAG 中广泛使用。

对于更长的查询,最好将每个查询放在它自己的.sql文件中,而不是用它弄乱 DAG。正如您在文档中看到的那样,Airflow 似乎支持所有 SQL 查询运算符,包括 BigQueryOperator 。

我的问题:在模板文件中编写了我的 sql 语句后.sql,如何将其添加到 Google Cloud Composer 并在 DAG 中引用它?

4

3 回答 3

5

在谷歌搜索并找到这个相关问题之后。我找到了一种方法来完成这项工作(尽管这不是理想的解决方案,我们将看到)。这是一个包含三个部分的工作示例:

  1. 带有一点 jinja 模板的 sql 模板文件,
  2. DAG 和
  3. gcloud将模板上传到正确位置所需的命令。

(1) sql 模板文件 这只是一个文件名以.sql扩展名结尾的文本文件。假设这个文件被调用my-templated-query.sql并包含:

SELECT COUNT(1)
FROM mytable
WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')

(2)引用 DAG 文件中的模板 要引用此模板,请创建如下操作符:

count_task = BigQueryOperator(
  task_id='count_rows',
  sql='/my-templated-query.sql')

(3)将模板文件添加到 Google Cloud Composer 原来,airflow 默认在 dags 文件夹中查找模板文件。要将我们的模板文件上传到 dags 文件夹,我们运行

gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql

您必须相应地替换环境名称、位置和源路径。

将所有这些模板上传到 dag 文件夹似乎并不正确。更好的 Airflow 做法是将模板放在它们自己的文件夹中,并在创建 DAG 时指定指向它template_searchpath的参数。但是,我不确定如何使用 Google Cloud Composer 执行此操作。

更新:我意识到可以将子文件夹放在 DAG 文件夹中,这对于组织大量 SQL 模板很有用。假设我DAG_FOLDER/dataset1/table1.sql 在 BigQueryOperator 中放了一个 SQL 模板文件,然后可以使用sql=/dataset1/table1.sql. 如果您有一个包含大量文件的子文件夹和许多其他子文件夹,您还可以使用dag import上面显示的 I 递归上传整个子文件夹 - 只需将其指向子文件夹即可。

于 2018-06-08T13:04:03.427 回答
4

我找到了解决这个问题的理想方法。在您的 dag 声明中,您可以设置template_searchpathAirflow 查找 jinja 模板文件的默认路径。

为了使它在您的 Cloud Composer 实例中工作,您必须将其设置为以下

dag = DAG(
    ...
    template_searchpath=["/home/airflow/gcs/plugins"],
)

请注意,我在此示例中使用了 plugins 文件夹。您可以改用您的数据文件夹或您希望在存储桶中包含的任何文件夹。

于 2021-06-29T14:25:09.577 回答
0

我们最近使用类似的策略解决了这个问题。步骤是:

  1. 将所有 SQL 文件放入Google Cloud Source Repository
  2. 在每次 DAG 运行开始时,将文件克隆到自动与您的 Airflow 环境共享的 Cloud Storage Bucket 中的“data”目录中。
  3. 在执行时使用BigQueryOperator.

这是一个最小的解决方案:

from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator

with models.DAG(
        'bigquery_dag',
        schedule_interval = None ,
        template_searchpath = ['/home/airflow/gcs/data/repo/queries/'],
        default_args = default_dag_args
        ) as dag:

    t1_clean_repo = bash_operator.BashOperator(
        task_id = 'clean_repo',
        bash_command = 'rm -rf /home/airflow/gcs/data/repo'
    )

    clone_command = """
        gcloud source repos clone repo --project=project_id
        cp -R repo /home/airflow/gcs/data
    """

    t2_clone_repo = bash_operator.BashOperator(
        task_id='clone_repo',
        bash_command=clone_command
        )

    t3_query = bigquery_operator.BigQueryOperator(
        task_id='query',
        sql= 'query.sql',
        use_legacy_sql = False,
        bigquery_conn_id='conn_id'
    )

我们在这里利用了一些重要的概念:

  1. Cloud Storage Bucket 中的数据目录会通过Fuse自动与您的 Airflow 实例共享。大多数操作员都可以访问此处放置的任何内容。
  2. 只要您的 Google Cloud Source 存储库与 Cloud Composer 位于同一项目中,您的 Airflow 实例就不需要额外git clone的文件权限。
  3. 我们template_searchpath在 DAG 参数中设置 ,扩展搜索范围以将data目录包含在 Cloud Storage Bucket 中。
于 2020-07-07T23:48:17.603 回答