0

我正在使用 Luigi 运行多个任务,然后我需要将输出批量传输到标准化文件位置。我写了一个 WrapperTask 用一个重写的complete()方法来做到这一点:

from luigi.task import flatten

class TaskX(luigi.WrapperTask):
    date = luigi.DateParameter()
    client = luigi.s3.S3Client()

    def requires(self):
        yield TaskA(date=self.date)     
        yield TaskB(date=self.date)

    def complete(self):
        tasks_complete = all(r.complete() for r in flatten(self.requires())) 

        ## at the end of everything, batch copy the files
        if tasks_complete:
            self.client.copy('current-old', 'current')
            return True
        else:
            return False


if __name__ == "__main__":
    luigi.run()

complete()但是当过程实际完成时,我无法获得要调用的条件部分。

我认为这是因为其他人指出的异步行为,但我不知道如何解决它。

我尝试使用以下命令行参数运行 Luigi:

$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task

但这似乎无法正常工作。这是处理此类任务的正确方法吗?

另外,我很好奇——有没有人有过这个--worker-retry-external-task命令的经验?我很难理解它。

源代码中,

def _is_external(task):
    return task.run is None or task.run == NotImplemented

被调用以确定 LuigiTask 是否有run()方法,而 aWrapperTask没有。因此,我希望--retry-external-task标志在complete()完成之前重试此操作,从而执行操作。但是,只是在解释器中玩弄让我相信:

>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
    <bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True

这段代码片段并没有按照它认为的那样做。

我是不是在这里离群?

4

1 回答 1

0

我仍然认为覆盖.complete()理论上应该能够做到这一点,我仍然不确定为什么不能这样做,但如果你只是在寻找一种在运行进程后批量传输文件的方法,一个可行的解决方案是只是为了让转移发生在一个.run()方法中:

def run(self):
    logger.info('transferring into current directory')
    self.client.copy('current-old','current')
于 2016-06-23T14:55:29.753 回答