2

我在 GCS 中有一个 json 格式的大型数据集,我需要将其加载到 BigQuery 中。问题是 json 数据没有存储在 NdJson 中,而是存储在几个大的 json 文件中,其中 JSON 中的每个键实际上应该是 json 本身中的一个字段。

例如 - 以下 Json:

{
  "johnny": {
    "type": "student"
  }, 
  "jeff": {
    "type": "teacher"
  }
}

应该转换成

[ 
  {
    "name": "johnny",
    "type": "student"
  }, 
  {
    "name": "jeff",
    "type": "teacher"
  }
]

我正在尝试通过 Google Data Flow 和 Apache Beam 来解决它,但是性能很糟糕,因为 ech “Worker” 必须做很多工作:

class JsonToNdJsonDoFn(beam.DoFn):
    def __init__(self, pk_field_name):
        self.__pk_field_name = pk_field_name

    def process(self, line):
        for key, record in json.loads(line).items():
            record[self.__pk_field_name] = key
            yield record

我知道这可以通过将其实现为SplittableDoFn以某种方式解决- 但是 Python 中的实现示例并不清楚。我应该如何将此 DoFn 构建为可拆分的,以及如何将其用作管道的一部分?

4

1 回答 1

-2

您需要一种方法来指定要处理 json 文件的部分范围。例如,它可以是一个字节范围。

博客文章中的Avro 示例是一个很好的示例。就像是:

class MyJsonReader(DoFn):
  def process(filename, tracker=DoFn.RestrictionTrackerParam)
    with fileio.ChannelFactory.open(filename) as file:
      start, stop = tracker.current_restriction()
      # Seek to the first block starting at or after the start offset.
      file.seek(start)
      next_record_start = find_next_record(file, start)
      while start:
        # Claim the position of the current record
        if not tracker.try_claim(next_record_start):
          # Out of range of the current restriction - we're done.
          return
        # start will point to the end of the record that was read
        record, start = read_record(file, next_record_start)
        yield record

  def get_initial_restriction(self, filename):
    return (0, fileio.ChannelFactory.size_in_bytes(filename))

但是,json 没有明确的记录边界,因此如果您的工作必须从 548 字节开始,则没有明确的方法可以告诉您要转移多少。如果该文件实际上就是您在那里的文件,那么您可以跳过字节,直到您看到 pattern "<string>": {。然后从{.

于 2020-03-31T23:40:29.337 回答