6

我一直在尝试将 Luigi 集成为我们的工作流处理程序。目前我们正在使用 concourse,但是我们尝试做的许多事情在 concourse 中很麻烦,所以我们切换到 Luigi 作为我们的依赖管理器。到目前为止没有问题,工作流触发并正确执行。

当任务由于某种原因失败时,问题就会出现。这种情况特别是任务的需要块,但是需要注意所有情况。到目前为止,Luigi 优雅地处理了错误并将其写入 STDOUT。但是它仍然发出并退出代码 0,这意味着工作通过了。误报。

我一直试图让事件处理来解决这个问题,但我无法让它触发,即使是一个非常简单的工作:

@luigi.Task.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    with open('/root/luigi', 'a') as f:
        f.write("we got the exception!") #testing in concourse image
    sys.exit(luigi.retcodes.retcode().unhandled_exception)

class Test(luigi.Task):
    def requires(self):
        raise Exception()
        return []

    def run(self):
        pass

    def output(self):
        return []

然后在 python shell 中运行命令

luigi.run(main_task_cls=Test, local_scheduler=True)

引发异常,但偶数不会触发或其他什么。该文件没有被写入,退出代码仍然是 0。

另外,如果有什么不同,我在 /etc/luigi/client.cfg 有我的 luigi 配置,其中包含

[retcode]
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40

我不知道为什么事件处理程序不会触发,但不知何故我需要该过程因错误而失败。

4

3 回答 3

3

似乎问题在于您放置“引发异常”调用的位置。如果您将它放在 requires 函数中 - 它基本上在您的 Test 任务运行方法之前运行。因此,并不是您的 Test 任务失败了,而是它所依赖的任务(现在,为空......)。

例如,如果您将 raise 移动到运行,您的代码将按照您的预期运行。

    def run(self):
       print('start')
       raise Exception()

要处理依赖失败的情况(在这种情况下,在 requires 方法中引发异常),您可以添加另一种类型的 luigi 事件处理程序,BROKEN_TASK:luigi.Event.BROKEN_TASK。这将确保 luigi 代码发出您期望的返回代码(不同于 0)。

干杯!

于 2018-02-19T11:36:11.777 回答
1

如果您想在 中捕获异常requires(),请使用以下命令:

@luigi.Task.event_handler(luigi.Event.BROKEN_TASK)
def mourn_failure(task, exception):
    ...
于 2019-08-06T14:47:48.500 回答
0

如果我理解正确的话,你只是想让 luigi 在任务失败时返回一个错误代码,这个我有很多问题,但结果很简单,你只需要在命令行上用 luigi 运行它,不是蟒蛇。像这样:

luigi --module my_module MyTask

我不知道这是否也是你的问题,但我正在使用 python 运行,然后 luigi 忽略了 luigi.cfg 上的 retcodes。希望能帮助到你。

于 2018-03-05T19:48:44.743 回答