2

在我的很多项目中,我使用luigi作为流水线工具。这让我想到用它来实现参数搜索。该标准luigi.file.LocalTarget有一种非常幼稚的方法来处理参数,这也显示在文档的示例中:

def output(self):
    return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)

即,参数保存在文件名中。这使得检查某个参数组合是否已经计算变得容易。一旦任务的参数更复杂,这就会变得混乱。

这是参数搜索的一个非常简单的想法:

import luigi
class Sum(luigi.Task):

    long_ = luigi.Parameter()
    list_ = luigi.Parameter()
    of = luigi.Parameter()
    parameters = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget("task{}_{}_{}_{}.txt".format(self.long_,
                                                              self.list_,
                                                              self.of,
                                                              self.parameters))

    def run(self):

        sum_ = self.long_ + self.list_ + self.of + self.parameters
        with self.output().open('w') as out_file:
            out_file.write(str(sum_))


class ParameterSearch(luigi.Task):

    def requires(self):

        list_of_parameter_combinations = [
            {
                "long_" : 1,
                "list_" : 2,
                "of" : 3,
                "parameters" : 4

            },{
                "long_" : 5,
                "list_" : 6,
                "of" : 7,
                "parameters" : 8
            }
        ]

        for pc in list_of_parameter_combinations:
            yield Sum(**pc)

当然,在这个例子中,所有四个参数都可以编码在文件名中,但是不需要很多幻想,这种方法可以达到边界。例如,考虑类似数组的参数。

我的后续想法是将参数和结果存储在某种信封对象中,然后可以将其保存为目标。然后,文件名可以是第一次模糊搜索的参数的某种散列。

有信封班

class Envelope(object):

    @classmethod
    def hashify(cls, params):
        return hash(frozenset(params.items()))

    def __init__(self, result, **params):

        self.params = {}
        for k in params:
            self.params[k] = params.get(k)

    def hash(self):
        return Envelope.hashify(self.params)

然后是新的 Target,它增强了 LocalTarget 并能够检查信封内的所有参数是否匹配:

class EnvelopedTarget(luigi.file.LocalTarget):

    fs = luigi.file.LocalFileSystem()

    def __init__(self, params, path=None, format=None, is_tmp=False):
        self.path = path
        self.params = params

        if format is None:
            format = luigi.file.get_default_format()

        if not path:
            if not is_tmp:
                raise Exception('path or is_tmp must be set')
            path = os.path.join(tempfile.gettempdir(), 'luigi-tmp-%09d' % random.randint(0, 999999999))
        super(EnvelopedTarget, self).__init__(path)
        self.format = format
        self.is_tmp = is_tmp

    def exists(self):
        path = self.path
        if '*' in path or '?' in path or '[' in path or '{' in path:
            logger.warning("Using wildcards in path %s might lead to processing of an incomplete dataset; "
                           "override exists() to suppress the warning.", path)
        if self.fs.exists(path):
            with self.open() as fin:
                envelope = pickle.load(fin)

                try:
                    assert len(envelope.params) == len(self.params)
                    for param,paramval in self.params.items():
                        assert paramval == envelope.params.get(param)

                except(AssertionError):
                    return False
            return True
        else:
            return False

这里的问题是,使用这个目标会增加一些原本 luigi 旨在最小化的样板。我设置了一个新的基础任务

class BaseTask(luigi.Task):

    def output(self, envelope):
        path = '{}{}.txt'.format(type(self).__name__, envelope.hash())
        params = envelope.params
        return EnvelopedTarget(params, path=path)

    def complete(self):

        envelope = Envelope(None, **self.param_kwargs)

        outputs = flatten(self.output(envelope))
        if len(outputs) == 0:
            warnings.warn(
                "Task %r without outputs has no custom complete() method" % self,
                stacklevel=2
            )
            return False

        return all(map(lambda output: output.exists(), outputs))

    def run(self):

        result, outparams = self.my_run()

        envelope = Envelope(result, **outparams)

        with self.output(envelope).open('w') as fout:
            pickle.dump(envelope, fout)

生成的EnvelopedSum任务将非常小:

class EnvelopedSum(BaseTask):

    long_ = luigi.Parameter()
    list_ = luigi.Parameter()
    of = luigi.Parameter()
    parameters = luigi.Parameter()

    def my_run(self):
        return sum(self.param_kwargs.values()), self.param_kwargs

此任务可以以与开始的任务相同的方式运行Sum

注意:这个如何封装 luigi-task-results 的示例实现远非稳定,而是更多地说明了我所说的封装结果和参数的含义。

我的问题是:难道没有更简单的方法来处理 luigi 中的许多复杂参数吗?

后续问题:有没有人考虑过保存已执行参数搜索的代码版本(和/或 subtaks 的包版本)的记录?

任何关于在哪里阅读有关此主题的评论也表示赞赏。

笔记:

您可能需要一些导入才能使其运行:

from luigi.task import flatten
import warnings
import pickle
4

1 回答 1

2

您可能会在邮件列表中对此类建议做出更好的回应。Luigi 任务代码已经生成了参数的 MD5 散列,以生成您可以获取的唯一任务标识符。

路易吉/task.py#L128

# task_id is a concatenation of task family, the first values of the first 3 parameters
# sorted by parameter name and a md5hash of the family/parameters as a cananocalised json.
param_str = json.dumps(params, separators=(',', ':'), sort_keys=True)
param_hash = hashlib.md5(param_str.encode('utf-8')).hexdigest()
于 2016-10-16T20:33:21.323 回答