在我的很多项目中,我使用luigi作为流水线工具。这让我想到用它来实现参数搜索。该标准luigi.file.LocalTarget
有一种非常幼稚的方法来处理参数,这也显示在文档的示例中:
def output(self):
return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)
即,参数保存在文件名中。这使得检查某个参数组合是否已经计算变得容易。一旦任务的参数更复杂,这就会变得混乱。
这是参数搜索的一个非常简单的想法:
import luigi
class Sum(luigi.Task):
long_ = luigi.Parameter()
list_ = luigi.Parameter()
of = luigi.Parameter()
parameters = luigi.Parameter()
def output(self):
return luigi.LocalTarget("task{}_{}_{}_{}.txt".format(self.long_,
self.list_,
self.of,
self.parameters))
def run(self):
sum_ = self.long_ + self.list_ + self.of + self.parameters
with self.output().open('w') as out_file:
out_file.write(str(sum_))
class ParameterSearch(luigi.Task):
def requires(self):
list_of_parameter_combinations = [
{
"long_" : 1,
"list_" : 2,
"of" : 3,
"parameters" : 4
},{
"long_" : 5,
"list_" : 6,
"of" : 7,
"parameters" : 8
}
]
for pc in list_of_parameter_combinations:
yield Sum(**pc)
当然,在这个例子中,所有四个参数都可以编码在文件名中,但是不需要很多幻想,这种方法可以达到边界。例如,考虑类似数组的参数。
我的后续想法是将参数和结果存储在某种信封对象中,然后可以将其保存为目标。然后,文件名可以是第一次模糊搜索的参数的某种散列。
有信封班
class Envelope(object):
@classmethod
def hashify(cls, params):
return hash(frozenset(params.items()))
def __init__(self, result, **params):
self.params = {}
for k in params:
self.params[k] = params.get(k)
def hash(self):
return Envelope.hashify(self.params)
然后是新的 Target,它增强了 LocalTarget 并能够检查信封内的所有参数是否匹配:
class EnvelopedTarget(luigi.file.LocalTarget):
fs = luigi.file.LocalFileSystem()
def __init__(self, params, path=None, format=None, is_tmp=False):
self.path = path
self.params = params
if format is None:
format = luigi.file.get_default_format()
if not path:
if not is_tmp:
raise Exception('path or is_tmp must be set')
path = os.path.join(tempfile.gettempdir(), 'luigi-tmp-%09d' % random.randint(0, 999999999))
super(EnvelopedTarget, self).__init__(path)
self.format = format
self.is_tmp = is_tmp
def exists(self):
path = self.path
if '*' in path or '?' in path or '[' in path or '{' in path:
logger.warning("Using wildcards in path %s might lead to processing of an incomplete dataset; "
"override exists() to suppress the warning.", path)
if self.fs.exists(path):
with self.open() as fin:
envelope = pickle.load(fin)
try:
assert len(envelope.params) == len(self.params)
for param,paramval in self.params.items():
assert paramval == envelope.params.get(param)
except(AssertionError):
return False
return True
else:
return False
这里的问题是,使用这个目标会增加一些原本 luigi 旨在最小化的样板。我设置了一个新的基础任务
class BaseTask(luigi.Task):
def output(self, envelope):
path = '{}{}.txt'.format(type(self).__name__, envelope.hash())
params = envelope.params
return EnvelopedTarget(params, path=path)
def complete(self):
envelope = Envelope(None, **self.param_kwargs)
outputs = flatten(self.output(envelope))
if len(outputs) == 0:
warnings.warn(
"Task %r without outputs has no custom complete() method" % self,
stacklevel=2
)
return False
return all(map(lambda output: output.exists(), outputs))
def run(self):
result, outparams = self.my_run()
envelope = Envelope(result, **outparams)
with self.output(envelope).open('w') as fout:
pickle.dump(envelope, fout)
生成的EnvelopedSum
任务将非常小:
class EnvelopedSum(BaseTask):
long_ = luigi.Parameter()
list_ = luigi.Parameter()
of = luigi.Parameter()
parameters = luigi.Parameter()
def my_run(self):
return sum(self.param_kwargs.values()), self.param_kwargs
此任务可以以与开始的任务相同的方式运行Sum
。
注意:这个如何封装 luigi-task-results 的示例实现远非稳定,而是更多地说明了我所说的封装结果和参数的含义。
我的问题是:难道没有更简单的方法来处理 luigi 中的许多复杂参数吗?
后续问题:有没有人考虑过保存已执行参数搜索的代码版本(和/或 subtaks 的包版本)的记录?
任何关于在哪里阅读有关此主题的评论也表示赞赏。
笔记:
您可能需要一些导入才能使其运行:
from luigi.task import flatten
import warnings
import pickle