1

我想传递执行日期,它在变量 {{ ds }} 中。但是,我通过一个函数传递了它,它没有得到执行日期。

def get_spark_step_2(date):
      #logic in here
      return step

exec_date = '{{ ds }}'

step_adder2 = EmrAddStepsOperator(
    task_id='create_parquets',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=get_spark_step_2(exec_date),
    dag=dag
)

你知道我如何在上面的上下文中使用变量吗?

4

1 回答 1

2

创建一个扩展 EmrAddStepsOperator 的类,并创建steps一个模板化字段。

像这样的东西:

class MyEmrAddStepsOperator(EmrAddStepsOperator):

    template_fields = ['job_flow_id','steps']

EmrAddStepsOperator本身仅job_flow_id作为模板字段:

class EmrAddStepsOperator(BaseOperator):
    """
    An operator that adds steps to an existing EMR job_flow.
    :param job_flow_id: id of the JobFlow to add steps to
    :type job_flow_name: str
    :param aws_conn_id: aws connection to uses
    :type aws_conn_id: str
    :param steps: boto3 style steps to be added to the jobflow
    :type steps: list
    """
    template_fields = ['job_flow_id']

您只能ds在模板化的字段中使用宏(如 )。

于 2017-05-18T14:58:26.773 回答