1

我正在编写一个数据流管道来处理来自谷歌云存储桶的视频。我的管道将每个工作项下载到本地系统,然后将结果重新上传回 GCP 存储桶。继上一个问题之后。

管道在本地 DirectRunner 上工作,我在 DataFlowRunnner 上调试时遇到问题。

错误读取

File "run_clouddataflow.py", line 41, in process 
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 464, in download_to_file self._do_download(transport, file_obj, download_url, headers) 
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 418, in _do_download download.consume(transport) File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 101, in consume self._write_to_stream(result) 
File "/usr/local/lib/python2.7/dist-packages/google/resumable_media/requests/download.py", line 62, in _write_to_stream with response: AttributeError: __exit__ [while running 'Run DeepMeerkat']

尝试在以下范围内执行blob.download_to_file(file_obj)时:

storage_client=storage.Client()
bucket = storage_client.get_bucket(parsed.hostname)
blob=storage.Blob(parsed.path[1:],bucket)

#store local path
local_path="/tmp/" + parsed.path.split("/")[-1]

print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
  blob.download_to_file(file_obj)

print("Downloaded" + local_path)

我猜工人不允许在本地写作?或者数据流容器中可能没有 /tmp 文件夹。我应该在哪里写对象?如果不访问环境,很难调试。是否可以从工作人员访问标准输出以进行调试(串行控制台?)

编辑#1

我尝试过明确传递凭据:

  try:
      credentials, project = google.auth.default()
  except:
      os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken
      credentials, project = google.auth.default()

以及写入 cwd(),而不是 /tmp/

local_path=parsed.path.split("/")[-1]

print('local path: ' + local_path)
with open(local_path, 'wb') as file_obj:
  blob.download_to_file(file_obj)

从 gcp 下载 blob 时仍然出现神秘错误。

完整的流水线脚本如下,setup.py 在这里

import logging
import argparse
import json
import logging
import os
import csv
import apache_beam as beam
from urlparse import urlparse
from google.cloud import storage

##The namespaces inside of clouddataflow workers is not inherited ,
##Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors, better to write ugly import statements then to miss a namespace

class PredictDoFn(beam.DoFn):
  def process(self,element):

    import csv
    from google.cloud import storage
    from DeepMeerkat import DeepMeerkat
    from urlparse import urlparse
    import os
    import google.auth


    DM=DeepMeerkat.DeepMeerkat()

    print(os.getcwd())
    print(element)

    #try adding credentials?
    #set credentials, inherent from worker
    credentials, project = google.auth.default()

    #download element locally
    parsed = urlparse(element[0])

    #parse gcp path
    storage_client=storage.Client(credentials=credentials)
    bucket = storage_client.get_bucket(parsed.hostname)
    blob=storage.Blob(parsed.path[1:],bucket)

    #store local path
    local_path=parsed.path.split("/")[-1]

    print('local path: ' + local_path)
    with open(local_path, 'wb') as file_obj:
      blob.download_to_file(file_obj)

    print("Downloaded" + local_path)

    #Assign input from DataFlow/manifest
    DM.process_args(video=local_path)
    DM.args.output="Frames"

    #Run DeepMeerkat
    DM.run()

    #upload back to GCS
    found_frames=[]
    for (root, dirs, files) in os.walk("Frames/"):
      for files in files:
        fileupper=files.upper()
        if fileupper.endswith((".JPG")):
          found_frames.append(os.path.join(root, files))

    for frame in found_frames:

      #create GCS path
      path="DeepMeerkat/" + parsed.path.split("/")[-1] + "/" + frame.split("/")[-1]
      blob=storage.Blob(path,bucket)
      blob.upload_from_filename(frame)

def run():
  import argparse
  import os
  import apache_beam as beam
  import csv
  import logging
  import google.auth

  parser = argparse.ArgumentParser()
  parser.add_argument('--input', dest='input', default="gs://api-project-773889352370-testing/DataFlow/manifest.csv",
                      help='Input file to process.')
  parser.add_argument('--authtoken', default="/Users/Ben/Dropbox/Google/MeerkatReader-9fbf10d1e30c.json",
                      help='Input file to process.')
  known_args, pipeline_args = parser.parse_known_args()

  #set credentials, inherent from worker
  try:
      credentials, project = google.auth.default()
  except:
      os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = known_args.authtoken
      credentials, project = google.auth.default()

  p = beam.Pipeline(argv=pipeline_args)

  vids = (p|'Read input' >> beam.io.ReadFromText(known_args.input)
       | 'Parse input' >> beam.Map(lambda line: csv.reader([line]).next())
       | 'Run DeepMeerkat' >> beam.ParDo(PredictDoFn()))

  logging.getLogger().setLevel(logging.INFO)
  p.run()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
4

1 回答 1

1

我与 google-cloud-storage package mantainer 进行了交谈,这是一个已知问题。将我的 setup.py 中的特定版本更新为

REQUIRED_PACKAGES = ["google-cloud-storage==1.3.2","google-auth","requests>=2.18.0"]

解决了这个问题。

https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3836

于 2017-08-17T23:45:30.937 回答