我有一个执行一些不稳定计算的 luigi 任务。考虑一个有时不会收敛的优化过程。
import luigi
MyOptimizer(luigi.Task):
input_param: luigi.Parameter()
output_filename = luigi.Parameter(default='result.json')
def run(self):
optimize_something(self.input_param, self.output().path)
def output(self):
return luigi.LocalTarget(self.output_filename)
现在我想构建一个包装器任务,该任务将使用不同的输入参数多次运行此优化器,并将获取第一次运行收敛的输出。
我现在实现它的方式是不使用MyOptimizer
,因为如果它失败,luigi 会认为包装器任务也失败了,但我可以接受一些MyOptimizer
失败的实例。
MyWrapper(luigi.Task):
input_params_list = luigi.ListParameter()
output_filename = luigi.Parameter(default='result.json')
def run(self):
for input_param in self.input_params_list:
try:
optimize_something(self.input_param, self.output().path)
print(f"Optimizer succeeded with input {input_param}")
break
except Exception as e:
print(f"Optimizer failed with input {input_param}. Trying again...")
def output(self):
return luigi.LocalTarget(self.output_filename)
问题是这样,任务没有并行化。此外,您可以想象MyOptimizer
并且optimize_something
是复杂的任务,它们也参与了由 luigi 处理的数据管道,这在我的代码中造成了相当多的混乱。
我将不胜感激有关如何以类似 luigi 的方式进行这项工作的任何见解和想法:)