我正在尝试使用 apache 光束管道作为 Google Dataflow 作业分批创建 100 条记录的 JSON 文件。
我正在从 BigQuery 读取记录并尝试创建每个有 100 条记录的 JSON 文件,即batch_size = 100
因此,我希望7 JSON
在执行700 records
从 BQ 读取的 Dataflow 时创建文件,但是我看到创建了更多文件并且批处理大小与预期不符。
我希望该"finish_bundle"
方法执行一次,但我看到它通过创建记录少于 100 的 JSON 批处理文件在那里多次执行。
这是当前 DF 执行的日志详细信息
batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093207.json
batch id - 2 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093239.json
batch id - 3 - length of batch (finish_bundle) - 43 - flie name - my_bucket/29_09_2021/jbatch_20210929_093253.json
batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093329.json
batch id - 2 - length of batch (finish_bundle) - 66 - flie name - my_bucket/29_09_2021/jbatch_20210929_093349.json
batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093423.json
batch id - 2 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093454.json
batch id - 3 - length of batch (finish_bundle) - 61 - flie name - my_bucket/29_09_2021/jbatch_20210929_093512.json
batch id - 1 - length of batch (finish_bundle) - 30 - flie name - my_bucket/29_09_2021/jbatch_20210929_093525.json
我希望创建具有正确序列和批量大小的 JSON 文件批次,每条记录有 100 条记录,如下所示
batch id - 1 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093207.json
batch id - 2 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093239.json
batch id - 3 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093253.json
batch id - 4 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093329.json
batch id - 5 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093349.json
batch id - 6 - length of batch (process)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093423.json
batch id - 7 - length of batch (finish_bundle)- 100 - flie name - my_bucket/29_09_2021/jbatch_20210929_093454.json
这是我创建 JSON 批处理文件并存储在 GCS 存储桶中的管道代码。
import os
import json
import apache_beam as beam
import logging
from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions
class CreateJSONBatch:
def execute_pipeline(self):
try:
query = "SELECT id, name, region, country, language, pin-code FROM `project.dataset.table` LIMIT 700"
beam_options = {
"project": "<project>",
"region": "<region>",
"job_name": "create_json_batch",
"runner": "DataflowRunner",
"temp_location": f"gs://<bucket>/temp_location/",
"setup_file": "./setup.py"
}
options = PipelineOptions(**beam_options, save_main_session=True)
with beam.Pipeline(options=options) as pipeline:
raw_data = (
pipeline | 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query,
use_standard_sql=True))
)
_ = (raw_data | 'Create JSON batch files ' >> beam.ParDo(CreateBatch()))
except Exception as e:
logging.error(f"Exception in execute_pipeline - {str(e)}")
class CreateBatch(beam.DoFn):
def __init__(self):
self.project = None
self.region = None
self.batch_size = None
self.data_bucket = None
self.json_folder = None
self.batch_id = 0
self.json_batch = []
def get_file_name(self):
try:
cur_time = datetime.now()
date_folder = f"{cur_time.strftime('%d_%m_%Y')}"
file_name = cur_time.strftime('%Y%m%d_%H%M%S')
file_name = os.path.join(self.data_bucket, self.json_folder, date_folder, f"jbatch_{file_name}.json")
return file_name # file_name -my_bucket/folder_to_store_json_files/29_09_2021/jbatch_20210929_060346.json
except Exception as e:
logging.error(f"Exception in CreateBatch.get_file_name - {str(e)}")
def create_json_files(self, json_file):
try:
json_file = f"gs://{json_file}"
beam_options = {
"project": self.project,
"region": self.region,
"runner": "DataflowRunner",
"temp_location": f"gs://<bucket>/temp_location/",
"setup_file": "./setup.py"
}
options = PipelineOptions(**beam_options, save_main_session=True)
with beam.Pipeline(options=options) as pipeline_for_json:
data = (
pipeline_for_json
| 'Create pcollection' >> beam.Create(self.json_batch)
| 'Write Output' >> beam.io.WriteToText(json_file, shard_name_template='')
)
except Exception as e:
logging.error(f"Exception in CreateBatch.create_json_files - {str(e)}")
def prep_data(self):
try:
formatted_json_batch = []
for x in range(len(self.json_batch)):
element = self.json_batch[x]
modified_element = "<logic to modify the element JSON to the needed format>"
# sample modified element
# {
# "id": "",
# "name": "",
# "address": {
# "region": "",
# "country": "",
# "language": "",
# "pin-code": ""
# }
# }
formatted_json_batch.append(json.dumps(modified_element))
return formatted_json_batch
except Exception as e:
self.logger.log_n_notify(log_type="error", msg=f"Exception in CreateBatch.prep_data - {str(e)}")
def process(self, record):
try:
self.project = "<project>"
self.region = "<region>"
self.batch_size = 100
self.data_bucket = "my_bucket"
self.json_folder = "folder_to_store_json_files"
if len(self.json_batch) < self.batch_size:
self.json_batch.append(record)
else:
self.batch_id = self.batch_id + 1
file_name = self.get_file_name()
# prepare for push
self.json_batch = self.prep_data()
logging.info(msg=f"batch id - {self.batch_id} - length of batch (process) - {str(len(self.json_batch))} - flie name - {file_name}")
# write to JSON
self.create_json_files(file_name)
self.json_batch = []
self.json_batch.append(record)
except Exception as e:
logging.error(f"Exception in CreateBatch.process - {str(e)}")
def finish_bundle(self):
try:
self.batch_id = self.batch_id + 1
if len(self.json_batch) > 0:
file_name = self.get_file_name()
# prepare for push
self.json_batch = self.prep_data()
logging.info(msg=f"batch id - {self.batch_id} - length of batch (finish_bundle) - {str(len(self.json_batch))} - flie name - {file_name}")
# write to JSON
self.create_json_files(file_name)
except Exception as e:
logging.error(f"Exception in CreateBatch.finish_bundle - {str(e)}")
if __name__ == "__main__":
create_batch = CreateJSONBatch()
create_batch.execute_pipeline()
我不确定为什么多次调用“finish_bundle”?
我的管道代码中的哪些修改会使文件以给定的批量大小创建?
编辑:我尝试使用“DirectRunner”执行相同的程序,它创建了正确数量的文件。