17

据我所知, aluigi.Target可以存在,也可以不存在。因此,如果 aluigi.Target存在,则不会重新计算。

我正在寻找一种方法来强制重新计算任务,如果它的一个依赖项被修改,或者如果其中一个任务的代码发生变化。

4

4 回答 4

21

实现目标的一种方法是覆盖该complete(...)方法。

的文档complete很简单

只需实现一个函数来检查您的约束,False如果您想重新计算任务则返回。

例如,要在更新依赖项时强制重新计算,您可以执行以下操作:

def complete(self):
    """Flag this task as incomplete if any requirement is incomplete or has been updated more recently than this task"""
    import os
    import time

    def mtime(path):
        return time.ctime(os.path.getmtime(path))

    # assuming 1 output
    if not os.path.exists(self.output().path):
        return False

    self_mtime = mtime(self.output().path) 

    # the below assumes a list of requirements, each with a list of outputs. YMMV
    for el in self.requires():
        if not el.complete():
            return False
        for output in el.output():
            if mtime(output.path) > self_mtime:
                return False

    return True

False当任何需求不完整或任何比当前任务最近被修改或当前任务的输出不存在时,这将返回。

检测代码何时更改更难。您可以使用类似的方案(检查mtime),但除非每个任务都有自己的文件,否则它会是命中注定的。

由于具有覆盖的能力,complete因此可以实现您想要重新计算的任何逻辑。如果您想为许多任务使用特定complete方法,我建议您进行子类化luigi.Task,在那里实现您的自定义complete,然后从子类继承您的任务。

于 2015-03-27T15:34:39.120 回答
4

我迟到了,但这里有一个 mixin,它改进了接受的答案以支持多个输入/输出文件。

class MTimeMixin:
    """
        Mixin that flags a task as incomplete if any requirement
        is incomplete or has been updated more recently than this task
        This is based on http://stackoverflow.com/a/29304506, but extends
        it to support multiple input / output dependencies.
    """

    def complete(self):
        def to_list(obj):
            if type(obj) in (type(()), type([])):
                return obj
            else:
                return [obj]

        def mtime(path):
            return time.ctime(os.path.getmtime(path))

        if not all(os.path.exists(out.path) for out in to_list(self.output())):
            return False

        self_mtime = min(mtime(out.path) for out in to_list(self.output()))

        # the below assumes a list of requirements, each with a list of outputs. YMMV
        for el in to_list(self.requires()):
            if not el.complete():
                return False
            for output in to_list(el.output()):
                if mtime(output.path) > self_mtime:
                    return False

        return True

要使用它,您只需使用例如class MyTask(Mixin, luigi.Task).

于 2016-06-18T04:13:47.480 回答
3

上面的代码对我很有效,除了我相信正确的时间戳比较mtime(path)必须返回一个浮点数而不是一个字符串(“Sat”>“Mon”...[原文如此])。因此,简单地说,

def mtime(path):
    return os.path.getmtime(path)

代替:

def mtime(path):
    return time.ctime(os.path.getmtime(path))
于 2016-08-30T08:42:55.620 回答
0

关于 Shilad Sen 在下面发布的 Mixin 建议,请考虑以下示例:

# Filename: run_luigi.py
import luigi
from MTimeMixin import MTimeMixin

class PrintNumbers(luigi.Task):

    def requires(self):
        wreturn []

    def output(self):
        return luigi.LocalTarget("numbers_up_to_10.txt")

    def run(self):
        with self.output().open('w') as f:
            for i in range(1, 11):
                f.write("{}\n".format(i))

class SquaredNumbers(MTimeMixin, luigi.Task):

    def requires(self):
        return [PrintNumbers()]

    def output(self):
        return luigi.LocalTarget("squares.txt")

    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))

if __name__ == '__main__':
    luigi.run()

其中 MTimeMixin 与上面的帖子一样。我使用一次运行任务

luigi --module run_luigi SquaredNumbers

然后我触摸文件 numbers_up_to_10.txt 并再次运行任务。然后 Luigi 提出以下抱怨:

  File "c:\winpython-64bit-3.4.4.6qt5\python-3.4.4.amd64\lib\site-packages\luigi-2.7.1-py3.4.egg\luigi\local_target.py", line 40, in move_to_final_destination
    os.rename(self.tmp_path, self.path)
FileExistsError: [WinError 183] Cannot create a file when that file already exists: 'squares.txt-luigi-tmp-5391104487' -> 'squares.txt'

这可能只是一个 Windows 问题,而不是 Linux 上的问题,其中“mv a b”可能只是删除旧的 b,如果它已经存在并且没有写保护。我们可以通过 Luigi/local_target.py 的以下补丁来解决这个问题:

def move_to_final_destination(self):
    if os.path.exists(self.path):
        os.rename(self.path, self.path + time.strftime("_%Y%m%d%H%M%S.txt"))
    os.rename(self.tmp_path, self.path)

为了完整起见,这里再次将 Mixin 作为单独的文件,来自另一篇文章:

import os

class MTimeMixin:
    """
        Mixin that flags a task as incomplete if any requirement
        is incomplete or has been updated more recently than this task
        This is based on http://stackoverflow.com/a/29304506, but extends
        it to support multiple input / output dependencies.
    """

    def complete(self):
        def to_list(obj):
            if type(obj) in (type(()), type([])):
                return obj
            else:
                return [obj]

        def mtime(path):
            return os.path.getmtime(path)

        if not all(os.path.exists(out.path) for out in to_list(self.output())):
            return False

        self_mtime = min(mtime(out.path) for out in to_list(self.output()))

        # the below assumes a list of requirements, each with a list of outputs. YMMV
        for el in to_list(self.requires()):
            if not el.complete():
                return False
            for output in to_list(el.output()):
                if mtime(output.path) > self_mtime:
                    return False

        return True
于 2017-12-15T21:50:13.253 回答