我在 GCS 中有一个 json 格式的大型数据集,我需要将其加载到 BigQuery 中。问题是 json 数据没有存储在 NdJson 中,而是存储在几个大的 json 文件中,其中 JSON 中的每个键实际上应该是 json 本身中的一个字段。
例如 - 以下 Json:
{
"johnny": {
"type": "student"
},
"jeff": {
"type": "teacher"
}
}
应该转换成
[
{
"name": "johnny",
"type": "student"
},
{
"name": "jeff",
"type": "teacher"
}
]
我正在尝试通过 Google Data Flow 和 Apache Beam 来解决它,但是性能很糟糕,因为 ech “Worker” 必须做很多工作:
class JsonToNdJsonDoFn(beam.DoFn):
def __init__(self, pk_field_name):
self.__pk_field_name = pk_field_name
def process(self, line):
for key, record in json.loads(line).items():
record[self.__pk_field_name] = key
yield record
我知道这可以通过将其实现为SplittableDoFn以某种方式解决- 但是 Python 中的实现示例并不清楚。我应该如何将此 DoFn 构建为可拆分的,以及如何将其用作管道的一部分?