7

我尝试添加一个逻辑,当管道由于某些错误而终止时将发送松弛通知。我试图用ExitHandler. 但是,似乎ExitHandler不能依赖任何操作。你有什么好主意吗?

4

1 回答 1

11

我找到了一个使用ExitHandler. 我在下面发布我的代码,希望它可以帮助其他人。


def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
    """
    performs slack notifications
    """    
    send_slack_op = dsl.ContainerOp(
        name=name,
        image='wenmin.wu/slack-cli:latest',
        is_exit_handler=is_exit_handler,
        command=['sh', '-c'],
        arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
    )
    send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
    return send_slack_op

@dsl.pipeline(
    name='forecasting-supply',
    description='forecasting supply ...'
)
def ml_pipeline(
    param1,
    param2,
    param3,
):
    exit_task = slack_notification(
        slack_channel = slack_channel,
        name = "supply-forecasting",
        status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
        is_exit_handler = True
    )

    with dsl.ExitHandler(exit_task):
        # put other tasks here

于 2019-08-21T06:53:51.287 回答