14

目前,我有一堆 luigi 任务排在一起,有一个简单的依赖链(a -> b -> c -> d)。d首先执行,a最后执行。a是被触发的任务。

除了a返回一个luigi.LocalTarget()对象之外的所有目标都具有一个泛型luigi.Parameter(),它是一个字符串(包含日期和时间)。在 luigi 中央服务器(已启用历史记录)上运行。

问题是,当我重新运行上述任务时a,luigi 检查历史记录并查看该特定任务之前是否已运行,如果它的状态为 DONE,它不会运行任务(d在这种情况下)并且我不能这样,更改字符串无济于事(向其添加了随机微秒)。如何强制运行任务?

4

6 回答 6

14

首先评论:Luigi 任务是幂等的。如果您使用相同的参数值运行任务,无论您运行多少次,它都必须始终返回相同的输出。所以多次运行它是没有意义的。这让 Luigi 变得强大:如果你有一个大任务,需要花费大量时间来完成很多事情,但它在某个地方失败了,你将不得不从头开始重新运行它。如果您将其拆分为较小的任务,运行它并失败,您只需运行管道中的其余任务。

当您运行任务时,Luigi 会检查该任务的输出以查看它们是否存在。如果他们不这样做,Luigi 会检查它所依赖的任务的输出。如果它们存在,那么它将只运行当前任务并生成输出Target。如果依赖项输出不存在,那么它将运行该任务。

所以,如果你想重新运行一个任务,你必须删除它的Target输出。如果你想重新运行整个管道,你必须删除任务在级联中依赖的所有任务的所有输出。

Luigi 存储库中正在对此问题进行讨论。看看这个评论,因为它会指向一些脚本,用于获取给定任务的输出目标并删除它们。

于 2016-01-06T17:21:07.760 回答
4

我通常通过覆盖来做到这一点complete()

class BaseTask(luigi.Task):

    force = luigi.BoolParameter()

    def complete(self):
        outputs = luigi.task.flatten(self.output())
        for output in outputs:
            if self.force and output.exists():
                output.remove()
        return all(map(lambda output: output.exists(), outputs))


class MyTask(BaseTask):
    def output(self):
        return luigi.LocalTarget("path/to/done/file.txt")

    def run(self):
        with self.output().open('w') as out_file:
            out_file.write('Complete')

运行任务时,将按预期创建输出文件。使用 实例化类后force=True,输出文件将仍然存在,直到complete()被调用。

task = MyTask()
task.run()
task.complete()
# True

new_task = MyTask(force=True)
new_task.output().exists()
# True
new_task.complete()
# False
new_task.output().exists()
# False
于 2019-02-14T18:15:09.463 回答
1

@cangers BaseTask 的改进,以在无法删除目标时引发错误。

class BaseTask(luigi.Task):
    force = luigi.BoolParameter(significant=False, default=False)

    def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    if self.force is True:
        outputs = luigi.task.flatten(self.output())
        for out in outputs:
            if out.exists():
                try:
                    out.remove()
                except AttributeError:
                    raise NotImplementedError


      

于 2021-03-27T06:00:07.143 回答
0

d6tflow允许您重置和强制重新运行任务,请参阅 https://d6tflow.readthedocs.io/en/latest/control.html#manually-forcing-task-reset-and-rerun中的详细信息。

# force execution including downstream tasks
d6tflow.run([TaskTrain()],force=[TaskGetData()])

# reset single task
TaskGetData().invalidate()

# reset all downstream task output
d6tflow.invalidate_downstream(TaskGetData(), TaskTrain())

# reset all upstream task input
d6tflow.invalidate_upstream(TaskTrain())

警告:它仅适用于 d6tflow 任务和目标,它们是修改后的本地目标,但不适用于所有 luigi 目标。应该带您走很长一段路,并针对数据科学工作流程进行了优化。适用于本地工作人员,尚未在中央服务器上测试。

于 2019-01-20T05:00:17.413 回答
0

我使用它来强制重新生成输出而无需先将其删除,并允许您选择要重新生成的类型。在我们的用例中,我们希望旧生成的文件继续存在,直到它们被新版本重写。

# generation.py
class ForcibleTask(luigi.Task):
    force_task_families = luigi.ListParameter(
        positional=False, significant=False, default=[]
    )

    def complete(self):
        print("{}: check {}".format(self.get_task_family(), self.output().path))
        if not self.output().exists():
            self.oldinode = 0  # so any new file is considered complete
            return False
        curino = pathlib.Path(self.output().path).stat().st_ino
        try:
            x = self.oldinode
        except AttributeError:
            self.oldinode = curino

        if self.get_task_family() in self.force_task_families:
            # only done when file has been overwritten with new file
            return self.oldinode != curino

        return self.output().exists()

示例用法

class Generate(ForcibleTask):
    date = luigi.DateParameter()
    def output(self):
        return luigi.LocalTarget(
            self.date.strftime("generated-%Y-%m-%d")
        )

调用

luigi --module generation Generate '--Generate-force-task-families=["Generate"]'
于 2019-09-12T22:08:29.377 回答
0

您可以使用内存中的输出存储,并且每次都会清除它们。我有一个解决方案,不知道这是否适合您的需要。

import uuid
class taskname(luigi.Task):
    id = luigi.Parameter(default=uuid.uuid5(uuid.NAMESPACE_DNS, random().__str__()).__str__(), positional=True) # this helps in getting a new id everytime it is executed.

   def output(self):
    # This is just to ensure the task is complete
      return luigi.mock.MockTarget(f'taskname-{self.id}')
   def run(self):
     #do your process here
     # if your process is successful then run this
     self.output().open('w').close()  #persists the object in memory for the scheduler to understand the task is complete.

我们使用类中创建的 id 来命名输出中的模拟目标。因此,即使您同时运行同一个 DAG 两次,中央调度程序也找不到此输出。只有当前批次可以在内存中访问它们。

于 2022-02-22T07:51:56.747 回答