我正在使用 Luigi 运行多个任务,然后我需要将输出批量传输到标准化文件位置。我写了一个 WrapperTask 用一个重写的complete()
方法来做到这一点:
from luigi.task import flatten
class TaskX(luigi.WrapperTask):
date = luigi.DateParameter()
client = luigi.s3.S3Client()
def requires(self):
yield TaskA(date=self.date)
yield TaskB(date=self.date)
def complete(self):
tasks_complete = all(r.complete() for r in flatten(self.requires()))
## at the end of everything, batch copy the files
if tasks_complete:
self.client.copy('current-old', 'current')
return True
else:
return False
if __name__ == "__main__":
luigi.run()
complete()
但是当过程实际完成时,我无法获得要调用的条件部分。
我认为这是因为其他人指出的异步行为,但我不知道如何解决它。
我尝试使用以下命令行参数运行 Luigi:
$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task
但这似乎无法正常工作。这是处理此类任务的正确方法吗?
另外,我很好奇——有没有人有过这个--worker-retry-external-task
命令的经验?我很难理解它。
在源代码中,
def _is_external(task):
return task.run is None or task.run == NotImplemented
被调用以确定 LuigiTask 是否有run()
方法,而 aWrapperTask
没有。因此,我希望--retry-external-task
标志在complete()
完成之前重试此操作,从而执行操作。但是,只是在解释器中玩弄让我相信:
>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
<bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True
这段代码片段并没有按照它认为的那样做。
我是不是在这里离群?