我有一个luigi
预处理任务,将我的原始数据拆分成更小的文件。然后这些文件将由实际管道处理。
所以关于参数,我想要求每个管道都有一个预处理文件 id 作为参数。但是,此文件 id 仅在预处理步骤中生成,因此仅在运行时才知道。为了说明我的想法,我提供了这个不起作用的代码:
import luigi
import subprocess
import random
class GenPipelineFiles(luigi.Task):
input_file = luigi.Parameter()
def requires(self):
pass
def output(self):
for i in range(random.randint(0,10)):
yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i))
def run(self):
for iout in self.output:
command = "touch {}".format(iout.fname)
subprocess.call(command, shell=True)
class RunPipelineOnSmallChunk(luigi.Task):
pass
class Experiment(luigi.WrapperTask):
input_file = luigi.Parameter(default="ex")
def requires(self):
file_ids = GenPipelineFiles(input_file=self.input_file)
for file_id in file_ids:
yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id)
luigi.run()
包装任务Experiment
应该
首先,以某种方式需要将原始数据拆分为文档
其次,要求实际管道具有获得的预处理文件ID。
中的随机输出文件数GenPipelineFiles
表明这不能硬编码到Experiment
'srequires
中。
可能与此相关的一个问题是,一个luigi
任务正确地只有一个输入目标和一个输出目标。可能有关如何对多个输出进行建模的注释GenPipelineFiles
也可以解决该问题。