0

我在 Google Cloud Dataflow 中运行批处理管道。我需要在一个管道中读取另一个管道先前写入的对象。最简单的 wa 对象是泡菜/莳萝。

写作效果很好,编写了许多文件,每个文件都有一个腌制对象。当我手动下载文件时,我可以解压文件。编写代码:beam.io.WriteToText('gs://{}', coder=coders.DillCoder())

但是每次读数都会中断,并出现以下错误之一。阅读代码:beam.io.ReadFromText('gs://{}*', coder=coders.DillCoder())

任何一个...

  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
KeyError: '\x90'

...或者...

  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named measur

(对象的类位于带有 的路径中measure,但不确定为什么它错过了那里的最后一个字符)

我尝试使用默认编码器,以及 aBytesCoder和 pickling & unpickling 作为管道中的自定义任务。

我的工作假设是读者逐行拆分文件,因此将单个泡菜(其中包含新行)视为多个对象。如果是这样,有没有办法避免这种情况?

我可以尝试自己构建一个阅读器,但我很犹豫,因为这似乎是一个很好解决的问题(例如,Beam 已经有一种格式可以将对象从一个管道阶段移动到另一个阶段)。

切向相关:如何在 Google Cloud DataFlow 作业中从 GCS 读取 blob(pickle)文件?

谢谢!

4

2 回答 2

1

ReadFromText旨在读取文本文件中的新行分隔记录,因此不适合您的用例。实施FileBasedSource也不是一个好的解决方案,因为它是为读取具有多条记录的大文件而设计的(并且通常将这些文件拆分为碎片以进行并行处理)。因此,在您的情况下,Python SDK 当前的最佳解决方案是自己实现源代码。这可以像ParDo读取文件并生成PCollection记录一样简单。如果您ParDo生成大量记录,请考虑在该步骤之后添加一个apache_beam.transforms.util.Reshuffle步骤,这将允许跑步者更好地并行化后续步骤。对于 Java SDK,我们FileIO已经提供了转换以使这更容易。

于 2018-02-01T17:22:15.043 回答
0

编码为string_escape转义换行符,因此 Beam 看到的唯一换行符是泡菜之间的换行符:

class DillMultiCoder(DillCoder):
    """
    Coder that allows multi-line pickles to be read
    After an object is pickled, the bytes are encoded as `unicode_escape`,
    meaning newline characters (`\n`) aren't in the string.

    Previously, the presence of newline characters these confues the Dataflow
    reader, as it can't discriminate between a new object and a new line
    within a pickle string
    """

    def _create_impl(self):
        return coder_impl.CallbackCoderImpl(
            maybe_dill_multi_dumps, maybe_dill_multi_loads)


def maybe_dill_multi_dumps(o):
    # in Py3 this needs to be `unicode_escape`
    return maybe_dill_dumps(o).encode('string_escape')


def maybe_dill_multi_loads(o):
    # in Py3 this needs to be `unicode_escape`
    return maybe_dill_loads(o.decode('string_escape'))

对于大泡菜,我还需要将缓冲区大小设置为 8MB - 在之前的缓冲区大小 (8kB) 上,一个 120MB 的文件旋转了 2 天的 CPU 时间:

class ReadFromTextPickle(ReadFromText):
    """
    Same as ReadFromText, but with a really big buffer. With the standard 8KB
    buffer, large files can be read on a loop and never finish

    Also added DillMultiCoder
    """

    def __init__(
            self,
            file_pattern=None,
            min_bundle_size=0,
            compression_type=CompressionTypes.AUTO,
            strip_trailing_newlines=True,
            coder=DillMultiCoder(),
            validate=True,
            skip_header_lines=0,
            **kwargs):
        # needs commenting out, not sure why    
        # super(ReadFromTextPickle, self).__init__(**kwargs)
        self._source = _TextSource(
            file_pattern,
            min_bundle_size,
            compression_type,
            strip_trailing_newlines=strip_trailing_newlines,
            coder=coder,
            validate=validate,
            skip_header_lines=skip_header_lines,
            buffer_size=8000000)

另一种方法是在文件上实现PickleFileSource继承FileBasedSource和调用pickle.load——每次调用都会产生一个新对象。但是周围有很多复杂情况offset_range_tracker,看起来比严格必要的提升更多

于 2018-01-16T05:09:31.787 回答