我正在编写一些 luigi 工作流程,并且正在尝试调试任务。为了做到这一点,我需要一遍又一遍地使用相同的参数重新运行这些任务,直到它们最终完成我想要的。
我知道 luigi 任务是幂等的,因此在给出与之前相同的输入时通常不会重新运行,这正是所需要的工作流调试和生产后所需要的。但是,在开发过程中,使用完全相同的输入和输出重新运行工作流是有用的——而且我认为这是必要的。
我知道我可以覆盖 complete() 方法以在开发过程中在每个任务中返回 False。但是,这会使任务处于未完成状态。
我正在寻找一种方法来设置我的工作流以某种“开发”或“调试”模式运行,这样我就可以一遍又一遍地运行和重新运行它以完成,即使所有任务都运行正确,直到我确定工作流程完全符合我的要求。
有没有办法在 luigi 中做到这一点?
先感谢您。
================稍后添加================
根据我在下面的评论,将输入参数更改为任务似乎不会导致它重新运行。只有当它的 output() 方法返回一个唯一值时,该任务才能重新运行。这似乎违背了“幂等”的定义,因为更改输入参数应该将真正的幂等任务视为一个新的、唯一的实体,而不管它是否碰巧返回与具有不同输入参数的另一个调用相同的输出。
下面的代码说明了这个问题。“x”参数确定 output() 方法返回的文件名,而“y”参数用于输出内容中,但不用于输出文件的名称。
如果我使用“--x 10 --y 20”和“--x 10 --y 30”调用我的工作流程,则第二次调用不会导致任何一个任务重新运行。我认为,这是不正确的行为。但是,如果我用“--x 10 --y 20”后跟“--x 11 --y 20”调用工作流,这两个任务确实会重新运行。
#!/usr/bin/python3
# -*- python -*-
import luigi
class Child(luigi.Task):
x = luigi.Parameter()
y = luigi.Parameter()
def requires(self):
return []
def output(self):
return luigi.LocalTarget("child_{}.txt".format(self.x))
def run(self):
with self.output().open('w') as f:
f.write('{} {}\n'.format(self.x, self.y))
class Parent(luigi.Task):
x = luigi.Parameter()
y = luigi.Parameter()
def requires(self):
return [ Child(self.x, self.y) ]
def output(self):
return luigi.LocalTarget("parent_{}.txt".format(self.x))
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
fout.write("from command line: --x {} --y {}, from child: {}\n".format(self.x, self.y, line.strip()))
if __name__ == '__main__':
luigi.run()