3

我有一个luigi预处理任务,将我的原始数据拆分成更小的文件。然后这些文件将由实际管道处理。

所以关于参数,我想要求每个管道都有一个预处理文件 id 作为参数。但是,此文件 id 仅在预处理步骤中生成,因此仅在运行时才知道。为了说明我的想法,我提供了这个不起作用的代码:

import luigi
import subprocess 
import random


class GenPipelineFiles(luigi.Task):

    input_file = luigi.Parameter()

    def requires(self):
        pass

    def output(self):

        for i in range(random.randint(0,10)):
            yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i))

    def run(self):

        for iout in self.output:
            command = "touch {}".format(iout.fname)
            subprocess.call(command, shell=True)


class RunPipelineOnSmallChunk(luigi.Task):
    pass


class Experiment(luigi.WrapperTask):

    input_file = luigi.Parameter(default="ex")

    def requires(self):

        file_ids = GenPipelineFiles(input_file=self.input_file)

        for file_id in file_ids:
            yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id)


luigi.run()

包装任务Experiment应该

  1. 首先,以某种方式需要将原始数据拆分为文档

  2. 其次,要求实际管道具有获得的预处理文件ID。

中的随机输出文件数GenPipelineFiles表明这不能硬编码到Experiment'srequires中。

可能与此相关的一个问题是,一个luigi任务正确地只有一个输入目标和一个输出目标。可能有关如何对多个输出进行建模的注释GenPipelineFiles也可以解决该问题。

4

1 回答 1

2

处理多个输出的一种简单方法是创建一个以输入文件命名的目录,并将拆分后的输出文件放入以输入文件命名的目录中。这样,依赖任务就可以检查目录是否存在。假设我有一个输入文件 123.txt,然后我创建一个目录 123_split,其中文件 1.txt、2.txt、3.txt 作为 的输出GenPipelineFiles,然后创建一个目录 123_processed 和 1.txt、2.txt、3 .txt 作为 .txt 的输出RunPipelineOnSmallChunk

对于您的requires方法Experiment,您必须返回要运行的任务,例如在列表中。您编写的方式file_ids = GenPipelineFiles(input_file=self.input_file)使我认为该对象的 run 方法没有被调用,因为该方法没有返回它。

这是一些示例代码,可在每个文件的基础上处理目标(但不是每个文件的任务)。我仍然认为有一个目录的单个输出目标或某种类型的哨兵文件来表明你已经完成了更安全。除非任务确保创建每个目标,否则原子性就会丢失。

PYTHONPATH=. luigi --module sampletask RunPipelineOnSmallChunk --local-scheduler

示例任务.py

import luigi
import os
import subprocess
import random


class GenPipelineFiles(luigi.Task):

    inputfile = luigi.Parameter()
    num_targets = random.randint(0,10)

    def requires(self):
        pass

    def get_prefix(self):
        return self.inputfile.split(".")[0]

    def get_dir(self):
        return "split_{}".format(self.get_prefix())

    def output(self):
        targets = []
        for i in range(self.num_targets):
            targets.append(luigi.LocalTarget("  {}/{}_{}.txt".format(self.get_dir(), self.get_prefix(), i)))
         return targets

    def run(self):
        if not os.path.exists(self.get_dir()):
            os.makedirs(self.get_dir())
        for iout in self.output():
            command = "touch {}".format(iout.path)
            subprocess.call(command, shell=True)


class RunPipelineOnSmallChunk(luigi.Task):

    inputfile = luigi.Parameter(default="test")

    def get_prefix(self):
        return self.inputfile.split(".")[0]

    def get_dir(self):
        return "processed_{}".format(self.get_prefix())

    @staticmethod
    def clean_input_path(path):
        return path.replace("split", "processed")

    def requires(self):
        return GenPipelineFiles(self.inputfile)

    def output(self):
        targets = []
        for target in self.input():
            targets.append(luigi.LocalTarget(RunPipelineOnSmallChunk.clean_input_path(target.path)))
        return targets

    def run(self):
        if not os.path.exists(self.get_dir()):
            os.makedirs(self.get_dir())
        for iout in self.output():
            command = "touch {}".format(iout.path)
            subprocess.call(command, shell=True)
于 2017-02-24T13:30:40.097 回答