19

我正在寻找一种允许动态设置给定 EmailOperator 任务发送的电子邮件内容的方法。理想情况下,我想让电子邮件内容依赖于 xcom 调用的结果,最好是通过 html_content 参数。

alert = EmailOperator(
    task_id=alertTaskID,
    to='please@dontreply.com',
    subject='Airflow processing report',
    html_content='raw content #2',
    dag=dag
)

我注意到 Airflow 文档说 xcom 调用可以嵌入到模板中。也许有一种方法可以使用指定任务 ID 上的模板来制定 xcom 拉取然后将结果作为 html_content 传递?谢谢

4

3 回答 3

14

使用PythonOperator+send_email代替:

from airflow.operators import PythonOperator
from airflow.utils.email import send_email


def email_callback(**kwargs):
    with open('/path/to.html') as f:
        content = f.read()
    send_email(
        to=[
            # emails
        ],
        subject='subject',
        html_content=content,
    )


email_task = PythonOperator(
    task_id='task_id',
    python_callable=email_callback,
    provide_context=True,
    dag=dag,
)
于 2017-11-23T10:39:24.160 回答
9

还不如自己回答这个问题。事实证明,使用 template+xcom 路线相当简单。此代码片段在已定义的 dag 的上下文中工作。它使用 BashOperator 而不是 EmailOperator,因为它更容易测试。

def pushparam(param, ds, **kwargs):
    kwargs['ti'].xcom_push(key='specificKey', value=param)
    return 

loadxcom = PythonOperator(
    task_id='loadxcom',
    python_callable=pushparam,
    provide_context=True,        
    op_args=['your_message_here'],
    dag=dag)

template2 = """
    echo "{{ params.my_param }}"
    echo "{{ task_instance.xcom_pull(task_ids='loadxcom', key='specificKey') }}"
"""
t5 = BashOperator(
    task_id='tt2',
    bash_command=template2,
    params={'my_param': 'PARAMETER1'},
    dag=dag)

可以使用以下命令在命令行上进行测试:

airflow test dag_name loadxcom 2015-12-31
airflow test dag_name tt2 2015-12-31

我最终将使用 EmailOperator 进行测试,如果它不起作用,请在此处添加一些内容......

于 2016-01-21T00:55:05.390 回答
8

对于那些正在寻找将 jinja 模板与 EmailOperator 一起使用的确切示例的人,这里有一个

from airflow.operators.email_operator import EmailOperator
from datetime import timedelta, datetime

email_task = EmailOperator(
    to='some@email.com',
    task_id='email_task',
    subject='Templated Subject: start_date {{ ds }}',
    params={'content1': 'random'},
    html_content="Templated Content: content1 - {{ params.content1 }}  task_key - {{ task_instance_key_str }} test_mode - {{ test_mode }} task_owner - {{ task.owner}} hostname - {{ ti.hostname }}",
    dag=dag)

您可以使用测试运行上述代码片段

airflow test dag_name email_task 2017-05-10
于 2017-05-11T15:21:30.343 回答