4

当我远程运行我的数据管道时会引发 PicklingError:数据管道是使用 Beam SDK for Python 编写的,并且我在 Google Cloud Dataflow 之上运行它。当我在本地运行管道时,它工作正常。

以下代码生成 PicklingError:这应该会重现问题

import apache_beam as beam
from apache_beam.transforms import pvalue
from apache_beam.io.fileio import _CompressionType
from apache_beam.utils.options import PipelineOptions
from apache_beam.utils.options import GoogleCloudOptions
from apache_beam.utils.options import SetupOptions
from apache_beam.utils.options import StandardOptions

if __name__ == "__main__":
  pipeline_options = PipelineOptions()
  pipeline_options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner'
  pipeline_options.view_as(SetupOptions).save_main_session = True
  google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
  google_cloud_options.project = "project-name"
  google_cloud_options.job_name = "job-name"
  google_cloud_options.staging_location = 'gs://path/to/bucket/staging'
  google_cloud_options.temp_location = 'gs://path/to/bucket/temp'
  p = beam.Pipeline(options=pipeline_options)
  p.run()

以下是 Traceback 开头和结尾的示例:

WARNING: Could not acquire lock C:\Users\ghousains\AppData\Roaming\gcloud\credentials.lock in 0 seconds
WARNING: The credentials file (C:\Users\ghousains\AppData\Roaming\gcloud\credentials) is not writable. Opening in read-only mode. Any refreshed credentials will only be valid for this run.
Traceback (most recent call last):
  File "formatter_debug.py", line 133, in <module>
    p.run()
  File "C:\Miniconda3\envs\beam\lib\site-packages\apache_beam\pipeline.py", line 159, in run
    return self.runner.run(self)
    ....
    ....
    ....
  File "C:\Miniconda3\envs\beam\lib\sitepackages\apache_beam\runners\dataflow_runner.py", line 172, in run
    self.dataflow_client.create_job(self.job))    
  StockPickler.save_global(pickler, obj)
  File "C:\Miniconda3\envs\beam\lib\pickle.py", line 754, in save_global (obj, module, name)) 
  pickle.PicklingError: Can't pickle <class 'apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum'>: it's not found as apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum
4

3 回答 3

4

我发现当 Pipeline 对象包含在被腌制并发送到云的上下文中时,会引发您的错误:

pickle.PicklingError: Can't pickle <class 'apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum'>: it's not found as apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum

当然,你可能会问:

  1. 是什么使 Pipeline 对象在发送到云时无法腌制,因为通常它是可腌制的?
  2. 如果这确实是问题所在,那么我不会一直收到此错误 - 通常情况下,Pipeline 对象不是包含在发送到云的上下文中吗?
  3. 如果 Pipeline 对象通常不包含在发送到云的上下文中,那么为什么我的案例中包含 Pipeline 对象?

(1)

当您使用 调用p.run()Pipeline 时cloud=True,首先发生的事情之一p.runner.job=apiclient.Job(pipeline.options)就是在apache_beam.runners.dataflow_runner.DataflowPipelineRunner.run.

如果没有此属性集,管道是可腌制的。但是一旦设置了这个,管道就不再是可腌制的,因为 p.runner.job.proto._Message__tags[17]是 a TypeValueValuesEnum,它被定义为apache_beam.internal.clients.dataflow.dataflow_v1b3_messages. 无法腌制 AFAIK 嵌套类(即使是 dill - 请参阅如何在 python 中腌制嵌套类?)。

(2)-(3)

与直觉相反,Pipeline 对象通常不包含在发送到云的上下文中。当您使用 调用p.run()Pipeline 时cloud=True,只会腌制以下对象(请注意,腌制发生在p.runner.job设置之后):

  1. 如果save_main_session=True,则指定模块中的所有全局对象__main__都被腌制。(__main__是您从命令行运行的脚本)。
  2. 管道中定义的每个转换都是单独腌制的

在您的情况下,您遇到了 #1,这就是您的解决方案有效的原因。我实际上遇到了 #2,其中我将beam.Maplambda 函数定义为复合的方法PTransform。(当应用复合变换时,管道被添加为变换的属性......)我的解决方案是在模块中定义那些 lambda 函数。

一个长期的解决方案是让我们在 Apache Beam 项目中解决这个问题。待定!

于 2016-11-30T15:26:39.463 回答
2

这应该在 google-dataflow 0.4.4 sdk 版本中使用https://github.com/apache/incubator-beam/pull/1485修复

于 2016-12-13T19:37:02.343 回答
1

我通过将 main 的主体封装在 run() 方法中并调用 run() 解决了这个问题。

于 2016-10-26T13:44:30.143 回答