据我所知, aluigi.Target可以存在,也可以不存在。因此,如果 aluigi.Target存在,则不会重新计算。



def complete(self):
    """Flag this task as incomplete if any requirement is incomplete or has been updated more recently than this task"""
    import os
    import time

    def mtime(path):
        return time.ctime(os.path.getmtime(path))

    # assuming 1 output
    if not os.path.exists(self.output().path):
        return False

    self_mtime = mtime(self.output().path) 

    # the below assumes a list of requirements, each with a list of outputs. YMMV
    for el in self.requires():
        if not el.complete():
            return False
        for output in el.output():
            if mtime(output.path) > self_mtime:
                return False

    return True




我迟到了,但这里有一个 mixin,它改进了接受的答案以支持多个输入/输出文件。

class MTimeMixin:
        Mixin that flags a task as incomplete if any requirement
        is incomplete or has been updated more recently than this task
        This is based on http://stackoverflow.com/a/29304506, but extends
        it to support multiple input / output dependencies.

    def complete(self):
        def to_list(obj):
            if type(obj) in (type(()), type([])):
                return obj
                return [obj]

        def mtime(path):
            return time.ctime(os.path.getmtime(path))

        if not all(os.path.exists(out.path) for out in to_list(self.output())):
            return False

        self_mtime = min(mtime(out.path) for out in to_list(self.output()))

        # the below assumes a list of requirements, each with a list of outputs. YMMV
        for el in to_list(self.requires()):
            if not el.complete():
                return False
            for output in to_list(el.output()):
                if mtime(output.path) > self_mtime:
                    return False

        return True

要使用它,您只需使用例如class MyTask(Mixin, luigi.Task).

def mtime(path):
    return os.path.getmtime(path)


def mtime(path):
    return time.ctime(os.path.getmtime(path))
关于 Shilad Sen 在下面发布的 Mixin 建议,请考虑以下示例:

# Filename: run_luigi.py
import luigi
from MTimeMixin import MTimeMixin

class PrintNumbers(luigi.Task):

    def requires(self):
        wreturn []

    def output(self):
        return luigi.LocalTarget("numbers_up_to_10.txt")

    def run(self):
        with self.output().open('w') as f:
            for i in range(1, 11):

class SquaredNumbers(MTimeMixin, luigi.Task):

    def requires(self):
        return [PrintNumbers()]

    def output(self):
        return luigi.LocalTarget("squares.txt")

    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))

if __name__ == '__main__':

其中 MTimeMixin 与上面的帖子一样。我使用一次运行任务

luigi --module run_luigi SquaredNumbers

然后我触摸文件 numbers_up_to_10.txt 并再次运行任务。然后 Luigi 提出以下抱怨:

  File "c:\winpython-64bit-\python-3.4.4.amd64\lib\site-packages\luigi-2.7.1-py3.4.egg\luigi\local_target.py", line 40, in move_to_final_destination
    os.rename(self.tmp_path, self.path)
FileExistsError: [WinError 183] Cannot create a file when that file already exists: 'squares.txt-luigi-tmp-5391104487' -> 'squares.txt'

这可能只是一个 Windows 问题,而不是 Linux 上的问题,其中“mv a b”可能只是删除旧的 b,如果它已经存在并且没有写保护。我们可以通过 Luigi/local_target.py 的以下补丁来解决这个问题:

def move_to_final_destination(self):
    if os.path.exists(self.path):
        os.rename(self.path, self.path + time.strftime("_%Y%m%d%H%M%S.txt"))
    os.rename(self.tmp_path, self.path)

为了完整起见,这里再次将 Mixin 作为单独的文件,来自另一篇文章:

import os

class MTimeMixin:
        Mixin that flags a task as incomplete if any requirement
        is incomplete or has been updated more recently than this task
        This is based on http://stackoverflow.com/a/29304506, but extends
        it to support multiple input / output dependencies.

    def complete(self):
        def to_list(obj):
            if type(obj) in (type(()), type([])):
                return obj
                return [obj]

        def mtime(path):
            return os.path.getmtime(path)

        if not all(os.path.exists(out.path) for out in to_list(self.output())):
            return False

        self_mtime = min(mtime(out.path) for out in to_list(self.output()))

        # the below assumes a list of requirements, each with a list of outputs. YMMV
        for el in to_list(self.requires()):
            if not el.complete():
                return False
            for output in to_list(el.output()):
                if mtime(output.path) > self_mtime:
                    return False

        return True
