问题标签 [luigi]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1591 浏览

debugging - luigi:出于调试目的重新运行任务?

我正在编写一些 luigi 工作流程,并且正在尝试调试任务。为了做到这一点,我需要一遍又一遍地使用相同的参数重新运行这些任务,直到它们最终完成我想要的。

我知道 luigi 任务是幂等的,因此在给出与之前相同的输入时通常不会重新运行,这正是所需要的工作流调试和生产后所需要的。但是,在开发过程中,使用完全相同的输入和输出重新运行工作流是有用的——而且我认为这是必要的。

我知道我可以覆盖 complete() 方法以在开发过程中在每个任务中返回 False。但是,这会使任务处于未完成状态。

我正在寻找一种方法来设置我的工作流以某种“开发”或“调试”模式运行,这样我就可以一遍又一遍地运行和重新运行它以完成,即使所有任务都运行正确,直到我确定工作流程完全符合我的要求。

有没有办法在 luigi 中做到这一点?

先感谢您。

================稍后添加================

根据我在下面的评论,将输入参数更改为任务似乎不会导致它重新运行。只有当它的 output() 方法返回一个唯一值时,该任务才能重新运行。这似乎违背了“幂等”的定义,因为更改输入参数应该将真正的幂等任务视为一个新的、唯一的实体,而不管它是否碰巧返回与具有不同输入参数的另一个调用相同的输出。

下面的代码说明了这个问题。“x”参数确定 output() 方法返回的文件名,而“y”参数用于输出内容中,但不用于输出文件的名称。

如果我使用“--x 10 --y 20”和“--x 10 --y 30”调用我的工作流程,则第二次调用不会导致任何一个任务重新运行。我认为,这是不正确的行为。但是,如果我用“--x 10 --y 20”后跟“--x 11 --y 20”调用工作流,这两个任务确实会重新运行。

0 投票
1 回答
1907 浏览

oozie - 及时安排 Spark 作业

这是每天/每周安排 Spark 作业的推荐工具。1) Oozie 2) Luigi 3) Azkaban 4) Chronos 5) Airflow

提前致谢。

0 投票
3 回答
10145 浏览

python - 如何在 Python Luigi 中使用参数

如何将参数传递给 Luigi?如果我有一个名为 FileFinder.py 的 python 文件和一个名为 getFIles 的类:

class getFiles(luigi.Task):

我想将一个目录传递给这个类,例如:

C://Documents//fileName

然后在我的运行方法中使用这个参数

def run(self):

如何在命令行中运行它并添加参数以在我的代码中使用?我习惯于在命令行中运行这个文件,如下所示:

python FileFinder.py getFiles --local-scheduler

我应该在代码中添加什么来使用参数,以及如何将该参数添加到命令行参数中?

另外,作为这个问题的扩展,我将如何使用多个参数?或不同数据类型的参数,如字符串或列表?

0 投票
1 回答
606 浏览

python - luigi - pip install luigi 时构建轮子失败

0 投票
1 回答
1448 浏览

python - luigi 依赖项在运行时更改

我有一个luigi预处理任务,将我的原始数据拆分成更小的文件。然后这些文件将由实际管道处理。

所以关于参数,我想要求每个管道都有一个预处理文件 id 作为参数。但是,此文件 id 仅在预处理步骤中生成,因此仅在运行时才知道。为了说明我的想法,我提供了这个不起作用的代码:

包装任务Experiment应该

  1. 首先,以某种方式需要将原始数据拆分为文档

  2. 其次,要求实际管道具有获得的预处理文件ID。

中的随机输出文件数GenPipelineFiles表明这不能硬编码到Experiment'srequires中。

可能与此相关的一个问题是,一个luigi任务正确地只有一个输入目标和一个输出目标。可能有关如何对多个输出进行建模的注释GenPipelineFiles也可以解决该问题。

0 投票
2 回答
196 浏览

python - 在返回 'self' 之前调用 setattr

我怀疑这对我来说是一种 klugefest,但我正在使用 Luigi 和 Sciluigi 模块,它们在初始化返回“自我”之前设置了许多关键参数。如果我在返回 self 后尝试对这些参数进行粗暴处理,Luigi.Parameter 对象会以这样的方式掩盖它们,以至于我无法做我需要做的事情。

luigi 和 sciluigi 类(使用它们时)包含 no __init__。如果我尝试插入__init__或调用super(ChildClass, self).__init__(*args, **kwargs)...我会收到奇怪的错误“意外参数”错误。

所以 Sciluigi 类看起来像这样......

所以...我希望我可以通过 setattr PRIOR 动态设置一些参数,以实际返回“自我”。但是 setattr 需要对象。

我希望我能做类似的事情......

编辑:@查尔斯·达菲

好吧,我不确定哪些信息最有帮助。

第一个问题是;我无法添加初始化。实际代码如下,__init__添加了一个方法。如果我尝试运行它,我已经包含了结果错误。它同样的错误是如果我运行超级调用__init__

错误

第二个问题是:

如果我等待 self 返回,我将无法再区分(例如使用上面的代码)......

如果我执行type(out), type 报告参数只是一个字符串(不是 sciluigi.Parameter 对象),所以如果我尝试使用 `ìsinstance(out, sciluigi.Parameter)...它返回 False。

底线是:

我需要能够动态(以编程方式)设置 sciluigi.Parameter 对象,随后能够区分 sciluigi.Parameter() 对象变量(如out)和“真实” str() 对象(如in_target

我希望这是有道理的。

0 投票
2 回答
996 浏览

python - Python 任务调度器 Luigi 可以检测间接依赖吗?

精简版:

Python中是否有可以执行gmake的任务调度程序?特别是,我需要一个递归解决依赖关系的任务调度程序。我调查了 Luigi,但它似乎只解决了直接依赖关系。

长版:

我正在尝试构建一个以预定义顺序处理大量数据文件的工作流,后面的任务可能直接依赖于一些较早任务的输出,但反过来,这些输出的正确性甚至依赖于较早的任务.

例如,让我们考虑如下依赖映射:

A <- B <- C

当我从任务 C 请求结果时,Luigi 会自动调度 B,然后由于 B 依赖于 A,它会调度 A。所以最终的运行顺序是 [A, B, C]。每个任务都会创建一个正式的输出文件作为成功执行的标志。这对于第一次运行来说很好。

现在,假设我在任务 A 的输入数据中犯了一个错误。显然,我需要再次重新运行整个链。但是,简单地从 A 中删除输出文件是行不通的。因为 Luigi 看到 B 和 C 的输出,得出结论任务 C 的要求已经满足,不需要运行。我必须从依赖于 A 的所有任务中删除输出文件,以便它们再次运行。在简单的情况下,我必须删除 A、B 和 C 中的所有输出文件,以便 Luigi 检测到对 A 所做的更改。

这是一个非常不方便的功能。如果我有数十或数百个任务相互之间具有相当复杂的依赖关系,那么当其中一项任务需要重新运行时,真的很难判断哪些任务会受到影响。对于任务调度程序并具有解决依赖关系的能力,我希望 Luigi 能够像 GNU-Make 一样行事,其中递归检查依赖关系,并且当最深的源文件之一发生更改时,将重建最终目标。

我想知道是否有人可以就这个问题提供一些建议。我是否缺少 Luigi 中的一些关键功能?是否有其他任务调度程序可以充当 gmake?我对基于 Python 的包特别感兴趣,并且更喜欢那些支持 Windows 的包。

非常感谢!

0 投票
2 回答
5416 浏览

python - 在 Luigi 的任务之间传递 Python 对象?

我正在使用Spotify 的 Luigi在 Python 3.6 中编写我的第一个项目,以在管道中安排一些自然语言处理任务。

我注意到output()一个类的函数Task总是返回某种Target对象,它只是某个地方的某个文件,无论是本地的还是远程的。因为我的任务会产生更复杂的数据结构,比如解析树,所以将它们作为字符串写入文件并在之后再次读取它们是非常尴尬的。

因此,我想问一下是否有可能在管道内的任务之间传递 Python 对象?

0 投票
3 回答
2488 浏览

python - Python Luigi 中的事件处理

我一直在尝试将 Luigi 集成为我们的工作流处理程序。目前我们正在使用 concourse,但是我们尝试做的许多事情在 concourse 中很麻烦,所以我们切换到 Luigi 作为我们的依赖管理器。到目前为止没有问题,工作流触发并正确执行。

当任务由于某种原因失败时,问题就会出现。这种情况特别是任务的需要块,但是需要注意所有情况。到目前为止,Luigi 优雅地处理了错误并将其写入 STDOUT。但是它仍然发出并退出代码 0,这意味着工作通过了。误报。

我一直试图让事件处理来解决这个问题,但我无法让它触发,即使是一个非常简单的工作:

然后在 python shell 中运行命令

luigi.run(main_task_cls=Test, local_scheduler=True)

引发异常,但偶数不会触发或其他什么。该文件没有被写入,退出代码仍然是 0。

另外,如果有什么不同,我在 /etc/luigi/client.cfg 有我的 luigi 配置,其中包含

我不知道为什么事件处理程序不会触发,但不知何故我需要该过程因错误而失败。

0 投票
1 回答
1328 浏览

python - 如何在 Luigi 中启用动态需求?

我在 Luigi 中建立了一个任务管道。由于此管道将在不同的上下文中使用,因此可能需要在管道的开头或结尾包含更多任务,甚至需要在任务之间包含完全不同的依赖关系。

那时我想:“嘿,为什么要在我的配置文件中声明任务之间的依赖关系?”,所以我在我的 config.py 中添加了这样的内容:

我对在整个任务中堆积的参数感到恼火,所以在某些时候我只引入了一个参数 ,task_config每个参数Task都有,并且存储了每个必要的信息或数据run()。所以我PIPELINE_DEPENDENCIES直接放在那里。

最后,我会让Task我定义的每一个都继承自luigi.Task一个自定义的 Mixin 类,它将实现 dynamic requires(),它看起来像这样:

不幸的是,这不起作用,因为运行管道给了我以下信息:

和很多

虽然我不确定这是否相关。

所以: 1. 我的错误是什么?它可以修复吗?2.还有其他方法可以实现吗?