9

我正在使用 Luigi 启动一些管道。我们举一个简单的例子

task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()

现在假设myTask在执行期间引发异常。我所能拥有的只是来自 luigi 的显示异常的日志。

luigi 有什么方法可以传播它或至少返回一个failure状态?

然后,我将能够使我的程序根据该状态做出反应。

谢谢。

编辑 我在存储结果时忘记指定 luigi 的输出是针对数据库的。如果引发异常,则不会存储任何结果,但不会将异常传播到 luigi。我想知道路易吉是否可以选择拥有这个。

4

2 回答 2

21

来自文档

Luigi 有一个内置的事件系统,允许您注册事件的回调并从您自己的任务中触发它们。您既可以挂钩一些预定义的事件,也可以创建自己的事件。每个事件句柄都与一个 Task 类相关联,并且只能从该类或其子类触发。这使您可以轻松地仅订阅特定类的事件(例如,对于 hadoop 作业)。

例子:

import luigi

from my_tasks import MyTask


@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
    of `run` on any MyTask subclass
    """

    do_something()


luigi.run()

Luigi 有很多活动可供您选择。您还可以查看此测试,以了解如何聆听和响应其他事件。

于 2015-10-28T16:37:49.973 回答
-1

您可以做的是将错误写入文件。例如,在您可能失败的任务中(我们称之为 TaskA):

x=""
try:
    do stuff
except:
    x="error!"
with open('errorfile.log','w') as f:
    f.write(x)

然后,在依赖于该错误的任务中,该任务将需要 TaskA。你可以这样做:

with open('errorfile.log','r') as f:
    if f.read()://if anything is in the error log from TaskA
        //error occurred
        do stuff
    else:
        do other stuff
于 2015-09-29T15:49:13.347 回答