4

我想用我从任务传递的一个参数执行一个函数。

这是我的带有状态参数的函数:

def sns_notify(state):
    client = boto3.client('sns')
    if state == "failed":
        message = config.get('sns', 'message') + state
    else:
        message = config.get('sns', 'message') + state
    response = client.publish(TargetArn=config.get('sns', 'target_arn'),
                              Message=message,
                              Subject=config.get('sns', 'subject'))
    return response

这是我的以状态为参数的任务:

t1 = DummyOperator(task_id='Dummy-1', trigger_rule=TriggerRule.ALL_SUCCESS,
                   on_success_callback=sns_notify("ok"), dag=dag)

t2 = DummyOperator(task_id='Dummy-2', trigger_rule=TriggerRule.ONE_FAILED,
                   on_success_callback=sns_notify("failed"), dag=dag)

当我运行 dag 时,该功能不会停止发送邮件(例如)

4

2 回答 2

5

每次 DAG 由气流加载时,它都会执行sns_notify("ok"),因为您正在调用该函数。您只需要传递sns_notify将接收的函数指针context。请参阅文档:https ://airflow.apache.org/code.html

trigger_rule与依赖项任务的执行方式有关,因此与on_success_callback.

但是,我不确定如何将变量传递给此回调 - 来这里寻找答案!

于 2018-01-19T22:38:44.813 回答
2

Hoju指出了确切的错误。

你可以使用函数式编程来帮助解决这个问题。

from functools import partial
send_success_notification = partial(sns_notify, "OK")
t1 = DummyOperator(task_id='Dummy-1', trigger_rule=TriggerRule.ALL_SUCCESS,
                   on_success_callback=send_success_notification , dag=dag)

send_failure_notification = partial(sns_notify, "FAILED")
t2 = DummyOperator(task_id='Dummy-2', trigger_rule=TriggerRule.ONE_FAILED,
                   on_success_callback=send_failure_notification, dag=dag)
于 2020-12-04T20:45:00.500 回答