据我所知, aluigi.Target
可以存在,也可以不存在。因此,如果 aluigi.Target
存在,则不会重新计算。
我正在寻找一种方法来强制重新计算任务,如果它的一个依赖项被修改,或者如果其中一个任务的代码发生变化。
实现目标的一种方法是覆盖该complete(...)
方法。
只需实现一个函数来检查您的约束,False
如果您想重新计算任务则返回。
例如,要在更新依赖项时强制重新计算,您可以执行以下操作:
def complete(self):
"""Flag this task as incomplete if any requirement is incomplete or has been updated more recently than this task"""
import os
import time
def mtime(path):
return time.ctime(os.path.getmtime(path))
# assuming 1 output
if not os.path.exists(self.output().path):
return False
self_mtime = mtime(self.output().path)
# the below assumes a list of requirements, each with a list of outputs. YMMV
for el in self.requires():
if not el.complete():
return False
for output in el.output():
if mtime(output.path) > self_mtime:
return False
return True
False
当任何需求不完整或任何比当前任务最近被修改或当前任务的输出不存在时,这将返回。
检测代码何时更改更难。您可以使用类似的方案(检查mtime
),但除非每个任务都有自己的文件,否则它会是命中注定的。
由于具有覆盖的能力,complete
因此可以实现您想要重新计算的任何逻辑。如果您想为许多任务使用特定complete
方法,我建议您进行子类化luigi.Task
,在那里实现您的自定义complete
,然后从子类继承您的任务。
我迟到了,但这里有一个 mixin,它改进了接受的答案以支持多个输入/输出文件。
class MTimeMixin:
"""
Mixin that flags a task as incomplete if any requirement
is incomplete or has been updated more recently than this task
This is based on http://stackoverflow.com/a/29304506, but extends
it to support multiple input / output dependencies.
"""
def complete(self):
def to_list(obj):
if type(obj) in (type(()), type([])):
return obj
else:
return [obj]
def mtime(path):
return time.ctime(os.path.getmtime(path))
if not all(os.path.exists(out.path) for out in to_list(self.output())):
return False
self_mtime = min(mtime(out.path) for out in to_list(self.output()))
# the below assumes a list of requirements, each with a list of outputs. YMMV
for el in to_list(self.requires()):
if not el.complete():
return False
for output in to_list(el.output()):
if mtime(output.path) > self_mtime:
return False
return True
要使用它,您只需使用例如class MyTask(Mixin, luigi.Task)
.
上面的代码对我很有效,除了我相信正确的时间戳比较mtime(path)
必须返回一个浮点数而不是一个字符串(“Sat”>“Mon”...[原文如此])。因此,简单地说,
def mtime(path):
return os.path.getmtime(path)
代替:
def mtime(path):
return time.ctime(os.path.getmtime(path))
关于 Shilad Sen 在下面发布的 Mixin 建议,请考虑以下示例:
# Filename: run_luigi.py
import luigi
from MTimeMixin import MTimeMixin
class PrintNumbers(luigi.Task):
def requires(self):
wreturn []
def output(self):
return luigi.LocalTarget("numbers_up_to_10.txt")
def run(self):
with self.output().open('w') as f:
for i in range(1, 11):
f.write("{}\n".format(i))
class SquaredNumbers(MTimeMixin, luigi.Task):
def requires(self):
return [PrintNumbers()]
def output(self):
return luigi.LocalTarget("squares.txt")
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
if __name__ == '__main__':
luigi.run()
其中 MTimeMixin 与上面的帖子一样。我使用一次运行任务
luigi --module run_luigi SquaredNumbers
然后我触摸文件 numbers_up_to_10.txt 并再次运行任务。然后 Luigi 提出以下抱怨:
File "c:\winpython-64bit-3.4.4.6qt5\python-3.4.4.amd64\lib\site-packages\luigi-2.7.1-py3.4.egg\luigi\local_target.py", line 40, in move_to_final_destination
os.rename(self.tmp_path, self.path)
FileExistsError: [WinError 183] Cannot create a file when that file already exists: 'squares.txt-luigi-tmp-5391104487' -> 'squares.txt'
这可能只是一个 Windows 问题,而不是 Linux 上的问题,其中“mv a b”可能只是删除旧的 b,如果它已经存在并且没有写保护。我们可以通过 Luigi/local_target.py 的以下补丁来解决这个问题:
def move_to_final_destination(self):
if os.path.exists(self.path):
os.rename(self.path, self.path + time.strftime("_%Y%m%d%H%M%S.txt"))
os.rename(self.tmp_path, self.path)
为了完整起见,这里再次将 Mixin 作为单独的文件,来自另一篇文章:
import os
class MTimeMixin:
"""
Mixin that flags a task as incomplete if any requirement
is incomplete or has been updated more recently than this task
This is based on http://stackoverflow.com/a/29304506, but extends
it to support multiple input / output dependencies.
"""
def complete(self):
def to_list(obj):
if type(obj) in (type(()), type([])):
return obj
else:
return [obj]
def mtime(path):
return os.path.getmtime(path)
if not all(os.path.exists(out.path) for out in to_list(self.output())):
return False
self_mtime = min(mtime(out.path) for out in to_list(self.output()))
# the below assumes a list of requirements, each with a list of outputs. YMMV
for el in to_list(self.requires()):
if not el.complete():
return False
for output in to_list(el.output()):
if mtime(output.path) > self_mtime:
return False
return True