12

我正在使用Spotify 的 Luigi在 Python 3.6 中编写我的第一个项目,以在管道中安排一些自然语言处理任务。

我注意到output()一个类的函数Task总是返回某种Target对象,它只是某个地方的某个文件,无论是本地的还是远程的。因为我的任务会产生更复杂的数据结构,比如解析树,所以将它们作为字符串写入文件并在之后再次读取它们是非常尴尬的。

因此,我想问一下是否有可能在管道内的任务之间传递 Python 对象?

4

2 回答 2

16

简短的回答:没有。

Luigi 参数仅限于日期/日期时间对象、字符串、整数和浮点数。请参阅文档以供参考

这意味着您需要将复杂的数据结构序列化为字符串(使用 json、msgpack、任何您喜欢的序列化器,甚至压缩它)并将其作为字符串参数传递。

当然,你可以编写一个自定义的 Parameter 子类,但你需要基本实现serialize 和 parse 方法

但请注意:如果您使用参数而不是将计算的数据保存到目标,您将失去使用 Luigi 的一个关键优势:如果树中的父任务失败的次数超过您指定的重试次数,那么您将需要再次运行计算该复杂数据结构的任务。如果您的任务计算复杂数据或花费大量时间或消耗大量资源,那么您应该将输出保存为目标,以便不必再次进行所有昂贵的计算。

展望未来:另一个任务可能也需要这些数据,那么为什么不保存它呢?

此外,请注意目标不仅仅是文件:您可以将数据保存到数据库表、Redis、Hadoop、弹性搜索索引等等: http: //luigi.readthedocs.io/en/stable/api/luigi .contrib.html#submodules

于 2017-02-28T20:14:09.467 回答
1

还有其他 - 仍然有点 hacky - 方法来实现你试图用目标而不是参数做的事情。

有一个特殊的MockFile目标luigi.mock,允许您将它的“文件”存储在内存中。

它的 api 类似于其他 Target 继承类,所以你必须open,readwrite它。突然它只支持string输入,所以你仍然需要序列化你的对象(这是由于通过进程之间的管道发送这些数据)。请参阅以下示例(yaml 序列化):

import yaml
from luigi import Task

class TaskA(Task):
    def output(self):
        return MockFile('whatever')

    def run(self):
        object_to_send = yaml.dump({"example": "dict"})

        _out = self.output().open('r')
        _out.write(object_to_send)
        _out.close()


class TaskB(Task):
    def requires(self):
        return TaskA()

    def run(self):
        _in = self.input().read('r')
        serialised = _in.read()
        deserialised = yaml.load(serialised)
        print(deserialised)

请注意,序列化大对象可能需要很多时间。

于 2019-05-27T16:41:46.647 回答