4

可以通过以下方式使用 Dataflow 读取 Cloud Storage 上未嵌套的 JSON 文件:

p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of()));

如果我只想将这些日志以最少的过滤写入 BigQuery,我可以使用像这样的 DoFn 来做到这一点:

private static class Formatter extends DoFn<TableRow,TableRow> {

        @Override
        public void processElement(ProcessContext c) throws Exception {

            // .clone() since input is immutable
            TableRow output = c.element().clone();

            // remove misleading timestamp field
            output.remove("@timestamp");

            // set timestamp field by using the element's timestamp
            output.set("timestamp", c.timestamp().toString());

            c.output(output);
        }
    }
}

但是,我不知道如何以这种方式访问​​ JSON 文件中的嵌套字段。

  1. 如果 TableRow 包含一个RECORDnamed r,是否可以在不进一步序列化/反序列化的情况下访问其键/值?
  2. 如果我需要使用库对自己进行序列化/反序列化,那么使用标准of代替来获得我以这种方式失去的一些性能Jackson是否更有意义?CoderTextIO.ReadTableRowJsonCoder

编辑

这些文件是换行符分隔的,看起来像这样:

{"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}}
{"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}}
4

1 回答 1

6

你最好的选择可能是做你在#2中描述的事情并直接使用杰克逊。让 TextIO 读取执行其构建的目的是最有意义的——使用字符串编码器从文件中读取行——然后使用 aDoFn来实际解析元素。类似于以下内容:

PCollection<String> lines = pipeline
  .apply(TextIO.from("gs://bucket/..."));
PCollection<TableRow> objects = lines
  .apply(ParDo.of(new DoFn<String, TableRow>() {
    @Override
    public void processElement(ProcessContext c) {
      String json = c.element();
      SomeObject object = /* parse json using Jackson, etc. */;
      TableRow row = /* create a table row from object */;
      c.output(row);
    }
  });

请注意,您也可以使用多个 ParDo 来执行此操作。

于 2017-02-02T22:38:11.037 回答