2

我目前正在 Airflow 的 MySQLOperator 中运行此查询。如何使用 Jinja 模板将区域、s3 存储桶替换为参数?

  • 气流版本:2.0.2
  • 蟒蛇:3.7
sql = """SELECT * FROM test
INTO OUTFILE S3 's3-ap-southeast-1://my-s3-bucket/my-key'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
  TERMINATED BY ','
  OPTIONALLY ENCLOSED BY '"'
LINES
  TERMINATED BY '\\n'
OVERWRITE ON;
"""
mysql_to_s3 = MySqlOperator(
  task_id="mysql_to_s3",
  dag=dag,
  sql=rds_sql,
  mysql_conn_id=MYSQL_CONN_ID,
  parameters={
    "s3_bucket": "my-s3-bucket",
    "s3_key_prefix": "my-key",
    "region": "ap-southeast-1",
  },
  autocommit=False,
  database="test",
)
4

2 回答 2

2

您可以使用 params 将动态值传递给您的 SQL:

sql = """SELECT * FROM test
INTO OUTFILE S3 '{{ params.region }}://{{ params.s3_bucket }}/{{ params.s3_key_prefix }}'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
  TERMINATED BY ','
  OPTIONALLY ENCLOSED BY '"'
LINES
  TERMINATED BY '\\n'
OVERWRITE ON;
"""

mysql_to_s3 = MySqlOperator(
  task_id="mysql_to_s3",
  dag=dag,
  sql=sql,
  mysql_conn_id=MYSQL_CONN_ID,
  params={
    "s3_bucket": "my-s3-bucket",
    "s3_key_prefix": "my-key",
    "region": "ap-southeast-1",
  },
  autocommit=False,
  database="test",
)

如果这些值存储在 Airflow 变量 ( region, s3_bucket, s3_key_prefix) 中,那么您可以从运算符中删除 params dict 并将您的 sql 更改为:

INTO OUTFILE S3 '{{ var.value.region }}://{{ var.value.s3_bucket }}/{{ var.value.s3_key_prefix }}'

在这两个选项中,Airflow 将模板化 sql 字符串,并在执行运算符时将占位符替换为值。您可以在任务渲染选项卡中查看实际值。

于 2021-10-21T11:13:58.790 回答
0

您可以使用气流变量 - https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html

Airflow jinja 模板支持 - https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html#concepts-jinja-templating

于 2021-10-21T09:12:09.400 回答