一个月以来我一直在努力,但我无法使用 WriteToAvro 将数据写入 GCS Bucket。
from __future__ import absolute_import
from __future__ import division
from datetime import datetime, timedelta, date
import argparse
import logging
import re
import os
import sys
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.io.avroio import WriteToAvro
from apache_beam.runners.runner import PipelineState
from apache_beam.io.gcp import bigquery_tools
import threading
import time
import types
from fastavro import parse_schema
from google.cloud import bigquery
import google.cloud.logging
class CustomParams(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
from datetime import datetime
parser.add_value_provider_argument('--projectname'
, type=str
, help='BigQuery project name to dntl data from')
parser.add_value_provider_argument('--jobname'
, type=str
, help='Dataflow JobName'
'format: Jobname')
#parser.add_value_provider_argument('--input_query'
# , help='Dataflow Input Subscription Name'
# 'format: input_subscription')
parser.add_value_provider_argument('--output_path'
, type=str
, help='GCS path and name of File'
'format: gs://BUCKET_NAME/FOLDER_NAME/FILE_NAME')
QUERY_BODY = "pubsub_id AS CNTCT_EVENT_ID"
schema = {
"name": "Transaction",
"type": "record",
"fields": [
{"name": "CNTCT_EVENT_ID", "type": ["null", "long"]}
]
}
schema_parsed = parse_schema(schema)
def run(argv=None):
options = PipelineOptions()
p = beam.Pipeline(options=options)
known_args = options.view_as(CustomParams)
options.view_as(WorkerOptions).use_public_ips = False
options.view_as(SetupOptions).save_main_session = True
gcp_project = options.view_as(GoogleCloudOptions).project
if 'work-dv' in gcp_project:
select_query = "SELECT {0} \
FROM `cio-datahub-work-dv-c03a6c.work_cust_intractn.usage_event_content_append`".format(QUERY_BODY)
elif 'work-qa' in gcp_project:
select_query = "SELECT {0} \
FROM `cio-datahub-work-dv-c03a6c.work_cust_intractn.usage_event_content_append`".format(QUERY_BODY)
rows = p | 'READ FROM Table' >> beam.io.Read(beam.io.BigQuerySource(query=select_query, use_standard_sql=True))
writeDataToAvro = (rows
|'WRITE TO AvroFile' >> WriteToAvro(known_args.output_path
, schema_parsed
, file_name_suffix='.avro'
, use_fastavro = True
)
)
result=p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
我尝试了以下事情:
- 使用 avro.schema.Parse 读取 Schema -> 给出无法读取 shcema 的错误,JSON 应该是 str、bytes 或 bytearray 而不是 dict
- 当使用“”“schema_given_in_code”“”解决这个问题时。在运行代码时,我收到错误消息,说当 shecma 处于 avro 时正在使用 fastavro
- 当通过说 use_fastavro = False 解决该错误时,在 ImmutableDict {}
- 在定义 AvroFileSink() 后尝试使用 WriteToFiles(path=job_options.outputLocation, sink=sink) ,但再次只生成文件并没有根据Beam 流管道不将文件写入存储桶写入数据
- 尝试将读取的数据转换为 JSON,在 fastavro._write.Writer.write 文件“fastavro/_write.pyx”,第 581 行,在 fastavro._write.write_data 中得到类似文件“fastavro/_write.pyx”,第 335 行的错误文件“fastavro/_write.pyx”,第 276 行,在 fastavro._write.write_record AttributeError: 'str' object has no attribute 'get'
一切都在 Jupyter Lab 环境中运行。但是一旦我创建模板,它就会失败。不知道为什么。请有人帮我解决这个问题
编辑: 根据要求,从 DirectRunner 运行的 Jupyter 实验室代码:
from __future__ import absolute_import
from __future__ import division
import argparse
import logging
import sys
import os
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.avroio import WriteToAvro
from fastavro import parse_schema
from avro.schema import Parse
from datetime import datetime
import json
import threading
import time
import types
from apache_beam.runners.runner import PipelineState
#pip install --upgrade fastavro <- Need to do this else will get Writer not found fo error
#pip install --upgrade avro-python3 <- Need to do this or else will get error due to schema
PROJECT_ID = 'cio-sea-team-lab-9e09db'
BUCKET = 'cio-sea-team-lab-9e09db-ecp-project'
# load the Service Account json file to allow GCP resources to be used
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "cio-sea-team-lab-9e09db-0b2782fa290b.json"
class Printer(beam.DoFn):
"""To Print the output on the Console Using ParDo"""
def process(self,data_item):
print (data_item)
select_query = 'SELECT \
pubsub_id AS CNTCT_EVENT_HEADER_ID \
FROM `cio-sea-team-lab-9e09db.ecp_project.outbound_contact_event_pubsub`'
#WORKS WITH AVRO.SCHEMA.PARSE
"""
SCHEMA = """{"namespace": "example.avro"
, "name": "Transaction"
, "type": "record"
, "fields": [
{"name": "CNTCT_EVENT_HEADER_ID", "type": ["null", "int"]}
]
}"""
schema_parsed=Parse(SCHEMA)
"""
#WORKS WITH FASTAVRO.PARSE_SCHEMA CURRENTLY IN USE
SCHEMA = {"namespace": "example.avro"
, "name": "Transaction"
, "type": "record"
, "fields": [
{"name": "CNTCT_EVENT_HEADER_ID", "type": ["null", "int"]}
]
}
schema_parsed = parse_schema(SCHEMA)
def run(argv=None):
pipeline_args = [
'--project={0}'.format(PROJECT_ID),
'--job_name=bq-to-bq-dtl',
'--region=northamerica-northeast1',
'--save_main_session',
'--staging_location=gs://{0}/misc-temp/'.format(BUCKET),
'--temp_location=gs://{0}/misc-temp/'.format(BUCKET),
'--subnetwork=https://www.googleapis.com/compute/alpha/projects/cio-sea-team-lab-9e09db/regions/northamerica-northeast1/subnetworks/sealabsubnet',
'--runner=DirectRunner'
]
#parser = argparse.ArgumentParser()
#known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(WorkerOptions).use_public_ips = False
#with beam.Pipeline(options=pipeline_options) as p:
p = beam.Pipeline(options=pipeline_options)
# Read the table r(options = pipeline_optionsows into a PCollection.
rows = p | 'READ FROM Staging Table' >> beam.io.Read(beam.io.BigQuerySource(query = select_query
, use_standard_sql=True))
# Write the output using a "Write" transform that has side effects.
x = (rows
#|'Print' >> beam.ParDo(Printer())
| 'WriteToText' >> WriteToAvro('gs://cio-sea-team-lab-9e09db-ecp-project/tgtoutput/CNTCT_EVENT_HEADER/CNTCT_EVENT_HEADER_GCP_'+ datetime.now().strftime("%Y%m%d%H%M")
, file_name_suffix='.avro'
, schema=schema_parsed
#, use_fastavro=False) This will be used if avro-python3 is being used else its not required to be mentioned
)
)
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()