2

精简版:

Python中是否有可以执行gmake的任务调度程序?特别是,我需要一个递归解决依赖关系的任务调度程序。我调查了 Luigi,但它似乎只解决了直接依赖关系。

长版:

我正在尝试构建一个以预定义顺序处理大量数据文件的工作流,后面的任务可能直接依赖于一些较早任务的输出,但反过来,这些输出的正确性甚至依赖于较早的任务.

例如,让我们考虑如下依赖映射:

A <- B <- C

当我从任务 C 请求结果时,Luigi 会自动调度 B,然后由于 B 依赖于 A,它会调度 A。所以最终的运行顺序是 [A, B, C]。每个任务都会创建一个正式的输出文件作为成功执行的标志。这对于第一次运行来说很好。

现在,假设我在任务 A 的输入数据中犯了一个错误。显然,我需要再次重新运行整个链。但是,简单地从 A 中删除输出文件是行不通的。因为 Luigi 看到 B 和 C 的输出,得出结论任务 C 的要求已经满足,不需要运行。我必须从依赖于 A 的所有任务中删除输出文件,以便它们再次运行。在简单的情况下,我必须删除 A、B 和 C 中的所有输出文件,以便 Luigi 检测到对 A 所做的更改。

这是一个非常不方便的功能。如果我有数十或数百个任务相互之间具有相当复杂的依赖关系,那么当其中一项任务需要重新运行时,真的很难判断哪些任务会受到影响。对于任务调度程序并具有解决依赖关系的能力,我希望 Luigi 能够像 GNU-Make 一样行事,其中递归检查依赖关系,并且当最深的源文件之一发生更改时,将重建最终目标。

我想知道是否有人可以就这个问题提供一些建议。我是否缺少 Luigi 中的一些关键功能?是否有其他任务调度程序可以充当 gmake?我对基于 Python 的包特别感兴趣,并且更喜欢那些支持 Windows 的包。

非常感谢!

4

2 回答 2

3

通过覆盖您的任务的完整方法似乎是可能的。您必须通过依赖关系图一直应用它。

def complete(self):
    outputs = self.flatten(self.output())
    if not all(map(lambda output: output.exists(), outputs)):
        return False
    for task in self.flatten(self.requires()):
        if not task.complete():
            for output in outputs:
                if output.exists():
                    output.remove()
            return False
    return True
于 2017-02-26T13:41:58.830 回答
0

事实上,这很不方便,d6tflow检查所有上游依赖项的完整性,而不仅仅是 TaskC 的输出。如果重置 TaskA,TaskC 也将不完整并自动重新运行。

# reset TaskA => makes TaskC incomplete
TaskA().invalidate() 
d6tflow.preview(TaskC()) # all tasks pending

有关详细信息,请参阅下面的完整示例和d6tflow 文档

import d6tflow
import pandas as pd

class TaskA(d6tflow.tasks.TaskCachePandas):  # save dataframe in memory

    def run(self):        
        self.save(pd.DataFrame({'a':range(10)})) # quickly save dataframe

class TaskB(d6tflow.tasks.TaskCachePandas):

    def requires(self):
        return TaskA() # define dependency

    def run(self):
        df = self.input().load() # quickly load required data
        df = df*2
        self.save(df)

class TaskC(d6tflow.tasks.TaskCachePandas):

    def requires(self):
        return TaskB()

    def run(self):
        df = self.input().load() 
        df = df*2
        self.save(df)

# Check task dependencies and their execution status
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (PENDING)]
   └─--[TaskB-{} (PENDING)]
      └─--[TaskA-{} (PENDING)]
'''

# Execute the model training task including dependencies
d6tflow.run(TaskC())

'''
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 TaskC()
'''

# all tasks complete
d6tflow.preview(TaskC())

'''
└─--[TaskC-{} (COMPLETE)]
   └─--[TaskB-{} (COMPLETE)]
      └─--[TaskA-{} (COMPLETE)]
'''

# reset TaskA => makes TaskC incomplete
TaskA().invalidate() 
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (PENDING)]
   └─--[TaskB-{} (PENDING)]
      └─--[TaskA-{} (PENDING)]
'''
于 2019-02-21T03:35:54.773 回答