15

我想使用执行日期作为我的 sql 文件的参数:

我试过了

dt = '{{ ds }}'

s3_to_redshift = PostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    params={'file': dt},
    dag=dag
)

但它不起作用。

4

2 回答 2

31

dt = '{{ ds }}'

不起作用,因为 Jinja(气流中使用的模板引擎)不处理整个 Dag 定义文件。

对于每个Operator字段,Jinja 都会处理一些字段,这些字段是运算符本身定义的一部分。

在这种情况下,如果您像这样扩展,您可以将params字段(实际上称为parameters,确保更改它)模板化PostgresOperator

class MyPostgresOperator(PostgresOperator):
    template_fields = ('sql','parameters')

现在你应该能够做到:

s3_to_redshift = MyPostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    parameters={'file': '{{ ds }}'},
    dag=dag
)
于 2017-06-14T15:12:27.430 回答
-1

PostgresOperator / JDBCOperator 继承自 BaseOperator。
BaseOperator 的输入参数之一是 params: self.params = params or {} # Available in templates!

因此,您应该能够在不创建新类的情况下使用它:(
即使 params 未包含在 template_fields 中) t1 = JdbcOperator( task_id='copy', sql='copy.sql', jdbc_conn_id='connection_name', params={'schema_name':'public'}, dag=dag )

SQL 语句 (copy.sql) 可能如下所示: copy {{ params.schema_name }}.table_name from 's3://.../table_name.csv' iam_role 'arn:aws:iam::<acc_num>:role/<role_name>' csv IGNOREHEADER 1

笔记:

copy.sql 位于 DAG 所在的同一位置。
或者
您可以在“default_args”中定义“template_searchpath”变量
并指定模板文件所在文件夹的绝对路径。
例如:'template_searchpath':'/home/user/airflow/templates/'

于 2018-12-04T05:39:38.367 回答