我想使用执行日期作为我的 sql 文件的参数:
我试过了
dt = '{{ ds }}'
s3_to_redshift = PostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
params={'file': dt},
dag=dag
)
但它不起作用。
我想使用执行日期作为我的 sql 文件的参数:
我试过了
dt = '{{ ds }}'
s3_to_redshift = PostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
params={'file': dt},
dag=dag
)
但它不起作用。
dt = '{{ ds }}'
不起作用,因为 Jinja(气流中使用的模板引擎)不处理整个 Dag 定义文件。
对于每个Operator
字段,Jinja 都会处理一些字段,这些字段是运算符本身定义的一部分。
在这种情况下,如果您像这样扩展,您可以将params
字段(实际上称为parameters
,确保更改它)模板化PostgresOperator
:
class MyPostgresOperator(PostgresOperator):
template_fields = ('sql','parameters')
现在你应该能够做到:
s3_to_redshift = MyPostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
parameters={'file': '{{ ds }}'},
dag=dag
)
PostgresOperator / JDBCOperator 继承自 BaseOperator。
BaseOperator 的输入参数之一是 params:
self.params = params or {} # Available in templates!
因此,您应该能够在不创建新类的情况下使用它:(
即使 params 未包含在 template_fields 中)
t1 = JdbcOperator(
task_id='copy',
sql='copy.sql',
jdbc_conn_id='connection_name',
params={'schema_name':'public'},
dag=dag
)
SQL 语句 (copy.sql) 可能如下所示:
copy {{ params.schema_name }}.table_name
from 's3://.../table_name.csv'
iam_role 'arn:aws:iam::<acc_num>:role/<role_name>'
csv
IGNOREHEADER 1
笔记:
copy.sql 位于 DAG 所在的同一位置。
或者
您可以在“default_args”中定义“template_searchpath”变量
并指定模板文件所在文件夹的绝对路径。
例如:'template_searchpath':'/home/user/airflow/templates/'