我正在尝试使用 Cloud Dataflow(Beam Python SDK)将其读取并写入 BigQuery。
读取和写入 2000 万条记录(约 80 MB)大约需要 30 分钟。
查看数据流 DAG,我可以看到将每个 CSV 行转换为 BQ Row 花费了大部分时间。
下面是执行相同操作的代码片段:
beam.Map(lambda s: data_ingestion.parse_record_string(s,data_ingestion.stg_schema_dict))
def parse_record_string(self, string_input,schema_dict):
for idx,(x,key) in enumerate(zip(imm_input,schema_dict)):
key = key.strip()
datatype = schema_dict[key].strip()
if key == 'HASH_ID' and datatype != 'STRING':
hash_id = hash(''.join(imm_input[1:idx]))
row_dict[key] = hash_id
else:
if x:
x = x.decode('utf-8').strip()
row_dict[key] = x
else:
row_dict[key] = None
#row_dict[key] = ''
return row_dict
除了 map transform ,我还使用了 ParDo 和 Flatmap。所有这些都产生相同的结果。
请提出任何可能的调整以减少时间。
提前致谢
