4

我有一个 Luigi 任务,它是requires一个子任务。子任务取决于父任务(即正在执行的任务)传递的参数require。我知道您可以通过设置指定子任务可以使用的参数...

def requires(self):
    return subTask(some_parameter)

...然后在子任务上,通过设置接收参数...

x = luigi.Parameter()

不过,这似乎只允许您通过一个参数。通过任意数量的参数(无论我想要什么类型)发送的最佳方式是什么?真的我想要这样的东西:

class parentTask(luigi.Task):

    def requires(self):
        return subTask({'file_postfix': 'foo',
                        'file_content': 'bar'
        })

    def run(self):
        return


class subTask(luigi.Task):
    params = luigi.DictParameter()

    def output(self):
        return luigi.LocalTarget("file_{}.csv".format(self.params['file_postfix']))

    def run(self):
        with self.output().open('w') as f:
            f.write(self.params['file_content'])

正如你所看到的,我尝试使用luigi.DictParameter而不是直行luigi.Parameter,但是TypeError: unhashable type: 'dict'当我运行上面的代码时,我是从 Luigi 深处的某个地方得到的。

运行 Python 2.7.11、Luigi 2.1.1

4

4 回答 4

4

通过任意数量的参数(无论我想要什么类型)发送的最佳方式是什么?

最好的方法是使用命名参数,例如,

#in requires
return MySampleSubTask(x=local_x, y=local_y)

class MySampleSubTask(luigi.Task):
    x = luigi.Parameter()
    y = luigi.Parameter()
于 2016-09-15T13:34:52.140 回答
2

通过任意数量的参数(无论我想要什么类型)发送的最佳方式是什么?

你可以按照这个例子。您将有一个占位符来定义所有需要传递的参数(ParameterCollector)。如果您需要在许多子任务的情况下将参数传递给子任务,这将避免在每个单个任务上定义参数。

class ParameterCollector(object):
    param1 = luigi.Parameter()
    param2 = luigi.Parameter()

    def collect_params(self):
        return {'param1': self.param1, 'param2': self.param2}


class TaskB(ParameterCollector, luigi.Task):
    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget('/tmp/task1_success')

    def run(self):
        with self.output().open('w') as f:
            f.write(self.param1)


class TaskA(ParameterCollector, luigi.Task):
    def requires(self):
        a = TaskB(**self.collect_params())
        print(a)
        return a

    def output(self):
        return luigi.LocalTarget('/tmp/task2_success')

    def run(self):
        with self.output().open('w') as f:
            f.write(str([self.param1, self.param2]))


if __name__ == '__main__':
    luigi.run()
于 2017-11-01T12:13:23.730 回答
0

好的,所以我发现这在 python 3.5 中按预期工作(并且问题在 3.4 中仍然存在)。

今天没有时间深入了解这一点,所以没有更多细节。

于 2016-08-24T14:27:36.813 回答
0

您可以使用以下内容。

class DownloadFile(luigi.Task):
    """This is file download task used to get
     the file from file parser"""
    id = luigi.Parameter(default=uuid.uuid4().__str__(), positional=True)
    #master_task_id = luigi.Parameter(positional=True)
    pipeline_data = luigi.DictParameter(default={}, significant=False,
                                    
    visibility=luigi.parameter.ParameterVisibility.PRIVATE,
                                    positional=True)
def output(self):
    # add the file output here
    return MockTarget("FileDownload", mirror_on_stderr=True)

def run(self):
    time.sleep(3)
    parsed_data = self.pipeline_data['pipeline_data'] #this helps in parsing the dict
于 2021-11-18T13:59:08.260 回答