3

我知道我应该有一个代码,但我还没有什么有用的。

我的 GCS 上有大约 300GB 的 JSON文件,gs://path/listings_all.json最终我试图将它导入 BigQuery,但它有一些错误的数据结构(我是mongoexport从 MongoDB 获取的)

无效的字段名称“$date”。字段只能包含字母、数字和下划线,以字母或下划线开头,长度最多为 128 个字符

所以,现在我的方法是以某种方式从 GCS 进程中逐行读取源文件,并使用 python API 将每个处理的行上传到 BigQuery。

在简单的阅读器下面,我用原始大文件中的 100 行样本进行了测试:

import json
from pprint import pprint

with open('schema_in_10.json') as f:
    for line in f:
        j_content = json.loads(line)

        # print(j_content['id'], j_content['city'], j_content['country'], j_content['state'], j_content['country_code'], j_content['smart_location'], j_content['address'], j_content['market'], j_content['neighborhood'])
        # // geo { lat, lng}'])
        print('------')
        pprint(j_content['is_location_exact'])
        pprint(j_content['zipcode'])
        pprint(j_content['name'])

您能否帮助我了解如何使用 Python3 从 Google Cloud Storage 逐行读取或流式传输巨大的 JSON?

4

4 回答 4

3

逐行读取它然后尝试流式传输到 BigQuery 不会在您的本地计算机上扩展到 300GB,并且您将很难获得这个工作 TBH。

有几个可扩展的选项:

  1. 编写 Cloud Dataflow 管道以从 GCS 读取您的文件(它将为您扩展并并行读取),更正字段名称,然后写入 BigQuery。见这里
  2. 使用 CSV 而不是 JSON 作为格式并使用未出现在数据中的分隔符将其直接加载到 BigQuery 中。这会将每条记录加载到单个字符串列中,然后您可以使用 BigQuery 的 JSON 函数来提取您需要的内容。见这里
于 2018-10-09T11:11:40.997 回答
3

这是 GCP Dataflow 中解决方案的示例实现,对应于接受的答案中的第一个建议。您需要在函数json_processor中实现 json 校正。您可以在Datalab笔记本中运行此代码。

# Datalab might need an older version of pip
# !pip install pip==9.0.3

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions

project_id = 'my-project'
bigquery_dataset_name = 'testdataset' # needs to exist 
table_name = 'testtable'
bucket_name = 'my-bucket'
json_file_gcs_path = 'gs://path/to/my/file.json'
schema = "name:STRING,zipcode:STRING"

def json_processor(row):
    import json
    d = json.loads(row)
    return {'name': d['name'], 'zipcode': d['zipcode']}

options = beam.options.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "myjob"
google_cloud_options.staging_location = 'gs://{}/binaries'.format(bucket_name)
google_cloud_options.temp_location = 'gs://{}/temp'.format(bucket_name)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.region = "europe-west1"

p = beam.Pipeline(options=options)

(p | "read_from_gcs" >> beam.io.ReadFromText(json_file_gcs_path)
   | "json_processor" >> beam.Map(json_processor)
   | "write_to_bq" >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table=table_name, 
                                                       dataset=bigquery_dataset_name, 
                                                       project=project_id, 
                                                       schema=schema, 
                                                       create_disposition='CREATE_IF_NEEDED',
                                                       write_disposition='WRITE_EMPTY'))
)

p.run()
于 2018-10-09T14:17:06.510 回答
3

smart_open现在支持流式传输 GCS 文件。

from smart_open import open

# stream from GCS
with open('gs://my_bucket/my_file.txt') as fin:
    for line in fin:
        print(line)

# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt', 'wb') as fout:
    fout.write(b'hello world')
于 2020-01-27T18:06:38.683 回答
1

使用内置的 json 解析器逐行解析 json 文件是行不通的(当然,除非它实际上是一个“json 行”文档),因此您需要一个流式解析器

但是,虽然这将解决内存使用问题,但它不会修复无效的 json,所以你最好的办法是首先将无效的 json 源修复为纯文本文件,无论是在 python 或 usingsed或一些类似的工具中,然后使用增量解析器来解析您的内容。

def fixfile(sourcepath, destpath):
    with open(sourcepath) as source, open(destpath, "w") as dest:
        for line in source:
            # you may want to use a regexp if this simple solution
            # breaks something else
            line = line.replace("$date", "date")
            dest.write(line)
于 2018-10-09T10:36:57.510 回答