14

我有一个执行一些不稳定计算的 luigi 任务。考虑一个有时不会收敛的优化过程。

import luigi

MyOptimizer(luigi.Task):
    input_param: luigi.Parameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        optimize_something(self.input_param, self.output().path)

    def output(self):
        return luigi.LocalTarget(self.output_filename)

现在我想构建一个包装器任务,该任务将使用不同的输入参数多次运行此优化器,并将获取第一次运行收敛的输出。

我现在实现它的方式是使用MyOptimizer,因为如果它失败,luigi 会认为包装器任务也失败了,但我可以接受一些MyOptimizer失败的实例。

MyWrapper(luigi.Task):
    input_params_list = luigi.ListParameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        for input_param in self.input_params_list:
            try:
                optimize_something(self.input_param, self.output().path)
                print(f"Optimizer succeeded with input {input_param}")
                break
            except Exception as e:
                print(f"Optimizer failed with input {input_param}. Trying again...")

    def output(self):
        return luigi.LocalTarget(self.output_filename)

问题是这样,任务没有并行化。此外,您可以想象MyOptimizer并且optimize_something是复杂的任务,它们也参与了由 luigi 处理的数据管道,这在我的代码中造成了相当多的混乱。

我将不胜感激有关如何以类似 luigi 的方式进行这项工作的任何见解和想法:)

4

0 回答 0