1

一个月以来我一直在努力,但我无法使用 WriteToAvro 将数据写入 GCS Bucket。

from __future__ import absolute_import
from __future__ import division

from datetime import datetime, timedelta, date
import argparse
import logging
import re
import os
import sys
import json

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.io.avroio import WriteToAvro
from apache_beam.runners.runner import PipelineState
from apache_beam.io.gcp import bigquery_tools
import threading
import time
import types 
from fastavro import parse_schema

from google.cloud import bigquery
import google.cloud.logging

class CustomParams(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        from datetime import datetime
        parser.add_value_provider_argument('--projectname'
                                           , type=str
                                           , help='BigQuery project name to dntl data from')
        parser.add_value_provider_argument('--jobname'
                                           , type=str
                                           , help='Dataflow JobName'
                                                'format: Jobname')
        #parser.add_value_provider_argument('--input_query'
        #                                   , help='Dataflow Input Subscription Name'
        #                                        'format: input_subscription')
        parser.add_value_provider_argument('--output_path'
                                           , type=str
                                           , help='GCS path and name of File'
                                                'format: gs://BUCKET_NAME/FOLDER_NAME/FILE_NAME')

QUERY_BODY = "pubsub_id AS CNTCT_EVENT_ID"

schema = {
          "name": "Transaction",
          "type": "record",
          "fields": [
              {"name": "CNTCT_EVENT_ID", "type": ["null", "long"]}
          ]
         } 

schema_parsed = parse_schema(schema)

def run(argv=None):
    options = PipelineOptions()
    p = beam.Pipeline(options=options)
    known_args = options.view_as(CustomParams)
    options.view_as(WorkerOptions).use_public_ips = False
    options.view_as(SetupOptions).save_main_session = True
    gcp_project = options.view_as(GoogleCloudOptions).project
    if 'work-dv' in gcp_project:
        select_query = "SELECT {0} \
        FROM `cio-datahub-work-dv-c03a6c.work_cust_intractn.usage_event_content_append`".format(QUERY_BODY)
    elif 'work-qa' in gcp_project:
        select_query = "SELECT {0} \
        FROM `cio-datahub-work-dv-c03a6c.work_cust_intractn.usage_event_content_append`".format(QUERY_BODY)
        
    rows = p | 'READ FROM Table' >> beam.io.Read(beam.io.BigQuerySource(query=select_query, use_standard_sql=True))

    writeDataToAvro = (rows 
                       |'WRITE TO AvroFile' >> WriteToAvro(known_args.output_path
                                        , schema_parsed
                                        , file_name_suffix='.avro'
                                        , use_fastavro = True
                                       )
         )
    
    result=p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

我尝试了以下事情:

  1. 使用 avro.schema.Parse 读取 Schema -> 给出无法读取 shcema 的错误,JSON 应该是 str、bytes 或 bytearray 而不是 dict
  2. 当使用“”“schema_g​​iven_in_code”“”解决这个问题时。在运行代码时,我收到错误消息,说当 shecma 处于 avro 时正在使用 fastavro
  3. 当通过说 use_fastavro = False 解决该错误时,在 ImmutableDict {}
  4. 在定义 AvroFileSink() 后尝试使用 WriteToFiles(path=job_options.outputLocation, sink=sink) ,但再次只生成文件并没有根据Beam 流管道不将文件写入存储桶写入数据
  5. 尝试将读取的数据转换为 JSON,在 fastavro._write.Writer.write 文件“fastavro/_write.pyx”,第 581 行,在 fastavro._write.write_data 中得到类似文件“fastavro/_write.pyx”,第 335 行的错误文件“fastavro/_write.pyx”,第 276 行,在 fastavro._write.write_record AttributeError: 'str' object has no attribute 'get'

一切都在 Jupyter Lab 环境中运行。但是一旦我创建模板,它就会失败。不知道为什么。请有人帮我解决这个问题

编辑: 根据要求,从 DirectRunner 运行的 Jupyter 实验室代码:

from __future__ import absolute_import
from __future__ import division

import argparse
import logging
import sys
import os
import re

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.avroio import WriteToAvro
from fastavro import parse_schema
from avro.schema import Parse

from datetime import datetime
import json

import threading
import time
import types   
from apache_beam.runners.runner import PipelineState

#pip install --upgrade fastavro <- Need to do this else will get Writer not found fo error
#pip install --upgrade avro-python3 <- Need to do this or else will get error due to schema

PROJECT_ID = 'cio-sea-team-lab-9e09db'
BUCKET = 'cio-sea-team-lab-9e09db-ecp-project'
# load the Service Account json file to allow GCP resources to be used
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "cio-sea-team-lab-9e09db-0b2782fa290b.json"

class Printer(beam.DoFn):
    """To Print the output on the Console Using ParDo"""
    def process(self,data_item):
        print (data_item)
        
select_query = 'SELECT \
pubsub_id AS CNTCT_EVENT_HEADER_ID \
FROM `cio-sea-team-lab-9e09db.ecp_project.outbound_contact_event_pubsub`'

#WORKS WITH AVRO.SCHEMA.PARSE
"""
SCHEMA = """{"namespace": "example.avro"
          , "name": "Transaction"
          , "type": "record"
          , "fields": [
              {"name": "CNTCT_EVENT_HEADER_ID", "type": ["null", "int"]}
          ]
         }"""

schema_parsed=Parse(SCHEMA)
"""

#WORKS WITH FASTAVRO.PARSE_SCHEMA CURRENTLY IN USE
SCHEMA = {"namespace": "example.avro"
          , "name": "Transaction"
          , "type": "record"
          , "fields": [
              {"name": "CNTCT_EVENT_HEADER_ID", "type": ["null", "int"]}
          ]
         }
schema_parsed = parse_schema(SCHEMA)         

def run(argv=None):
    pipeline_args = [
        '--project={0}'.format(PROJECT_ID),
        '--job_name=bq-to-bq-dtl',
        '--region=northamerica-northeast1',
        '--save_main_session',
        '--staging_location=gs://{0}/misc-temp/'.format(BUCKET),
        '--temp_location=gs://{0}/misc-temp/'.format(BUCKET),
        '--subnetwork=https://www.googleapis.com/compute/alpha/projects/cio-sea-team-lab-9e09db/regions/northamerica-northeast1/subnetworks/sealabsubnet',
        '--runner=DirectRunner'
    ]
    
    #parser = argparse.ArgumentParser()
    #known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(WorkerOptions).use_public_ips = False
    
    #with beam.Pipeline(options=pipeline_options) as p:
    p = beam.Pipeline(options=pipeline_options)
    # Read the table r(options = pipeline_optionsows into a PCollection.
    rows = p | 'READ FROM Staging Table' >> beam.io.Read(beam.io.BigQuerySource(query = select_query
                                                                                , use_standard_sql=True))


    # Write the output using a "Write" transform that has side effects.
    x = (rows 
         #|'Print' >>  beam.ParDo(Printer())
         | 'WriteToText' >> WriteToAvro('gs://cio-sea-team-lab-9e09db-ecp-project/tgtoutput/CNTCT_EVENT_HEADER/CNTCT_EVENT_HEADER_GCP_'+ datetime.now().strftime("%Y%m%d%H%M")
                                                , file_name_suffix='.avro'
                                                , schema=schema_parsed
                                                #, use_fastavro=False) This will be used if avro-python3 is being used else its not required to be mentioned
                                                )
         )
    
    p.run().wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
4

0 回答 0