我在 Google Cloud Dataflow 中运行批处理管道。我需要在一个管道中读取另一个管道先前写入的对象。最简单的 wa 对象是泡菜/莳萝。
写作效果很好,编写了许多文件,每个文件都有一个腌制对象。当我手动下载文件时,我可以解压文件。编写代码:beam.io.WriteToText('gs://{}', coder=coders.DillCoder())
但是每次读数都会中断,并出现以下错误之一。阅读代码:beam.io.ReadFromText('gs://{}*', coder=coders.DillCoder())
任何一个...
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
KeyError: '\x90'
...或者...
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named measur
(对象的类位于带有 的路径中measure
,但不确定为什么它错过了那里的最后一个字符)
我尝试使用默认编码器,以及 aBytesCoder
和 pickling & unpickling 作为管道中的自定义任务。
我的工作假设是读者逐行拆分文件,因此将单个泡菜(其中包含新行)视为多个对象。如果是这样,有没有办法避免这种情况?
我可以尝试自己构建一个阅读器,但我很犹豫,因为这似乎是一个很好解决的问题(例如,Beam 已经有一种格式可以将对象从一个管道阶段移动到另一个阶段)。
切向相关:如何在 Google Cloud DataFlow 作业中从 GCS 读取 blob(pickle)文件?
谢谢!