0

我正在尝试使用 Cloud Dataflow(Beam Python SDK)将其读取并写入 BigQuery。

读取和写入 2000 万条记录(约 80 MB)大约需要 30 分钟。

查看数据流 DAG,我可以看到将每个 CSV 行转换为 BQ Row 花费了大部分时间。

在此处输入图像描述

下面是执行相同操作的代码片段:

beam.Map(lambda s: data_ingestion.parse_record_string(s,data_ingestion.stg_schema_dict)) 

    def parse_record_string(self, string_input,schema_dict): 

        for idx,(x,key) in enumerate(zip(imm_input,schema_dict)):
            key = key.strip()
            datatype = schema_dict[key].strip()
            if key == 'HASH_ID' and datatype != 'STRING':
                hash_id = hash(''.join(imm_input[1:idx]))
                row_dict[key] = hash_id
            else:
                if x:
                    x = x.decode('utf-8').strip()
                    row_dict[key] = x
                else:
                    row_dict[key] = None
                    #row_dict[key] = ''
        return row_dict

除了 map transform ,我还使用了 ParDo 和 Flatmap。所有这些都产生相同的结果。

请提出任何可能的调整以减少时间。

提前致谢

4

1 回答 1

2

当您查看代码时,您的代码是计算密集型的。对于每条 20M 行,您执行:

  • 一个 for 循环(每行有多少个元素?)
  • 一个 zip 和枚举
  • 在循环的每个元素上
    • 您执行 2 个条带(在字符串上循环以删除空格)
    • 切片上的连接(这是 2 个循环)-> 这种情况多久为真?
    • 另一种情况下的另一个条带

Python 非常棒,非常方便,有很多助手。但是,请注意这种简单性的陷阱并正确评估算法的复杂性。

如果您了解 Java,请尝试一下。它可能会更有效率。

于 2019-09-24T02:03:35.597 回答