可以通过以下方式使用 Dataflow 读取 Cloud Storage 上未嵌套的 JSON 文件:
p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of()));
如果我只想将这些日志以最少的过滤写入 BigQuery,我可以使用像这样的 DoFn 来做到这一点:
private static class Formatter extends DoFn<TableRow,TableRow> {
@Override
public void processElement(ProcessContext c) throws Exception {
// .clone() since input is immutable
TableRow output = c.element().clone();
// remove misleading timestamp field
output.remove("@timestamp");
// set timestamp field by using the element's timestamp
output.set("timestamp", c.timestamp().toString());
c.output(output);
}
}
}
但是,我不知道如何以这种方式访问 JSON 文件中的嵌套字段。
- 如果 TableRow 包含一个
RECORD
namedr
,是否可以在不进一步序列化/反序列化的情况下访问其键/值? - 如果我需要使用库对自己进行序列化/反序列化,那么使用标准of代替来获得我以这种方式失去的一些性能
Jackson
是否更有意义?Coder
TextIO.Read
TableRowJsonCoder
编辑
这些文件是换行符分隔的,看起来像这样:
{"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}}
{"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}}