我需要从 GCS 中压缩的文件中解析 json 数据,因为文件扩展名为 .gz,因此它应该由数据流重新组织和正确处理,但是作业日志打印出不可读的字符和未处理的数据。当我处理未压缩的数据时,它工作正常。我使用以下方法来映射/解析 json:
ObjectMapper mapper = new ObjectMapper();
Map<String, String> eventDetails = mapper.readValue(c.element(),
new TypeReference<Map<String, String>>() {
});
知道可能是什么原因吗?
====================================
要添加有关如何从输入文件中读取的更多详细信息:
创建管道:
Poptions pOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(Poptions.class); Pipeline p = Pipeline.create(pOptions); p.apply(TextIO.Read.named("ReadLines").from(pOptions.getInput())) .apply(new Pimpression()) .apply(BigQueryIO.Write .to(pOptions.getOutput()) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); p.run();
运行时配置:
PROJECT="myProjectId" DATASET="myDataSetId" INPUT="gs://foldername/input/*" STAGING1="gs://foldername/staging" TABLE1="myTableName" mvn exec:java -pl example \ -Dexec.mainClass=com.google.cloud.dataflow.examples.Example1 \ -Dexec.args="--project=${PROJECT} --output=${PROJECT}:${DATASET}.${TABLE1} --input=${INPUT} --stagingLocation=${STAGING1} --runner=BlockingDataflowPipelineRunner"
输入文件名示例:file.gz,以及命令 gsutil ls -L gs://bucket/input/file.gz 的输出 | grep 内容- 是:
Content-Length: 483100 Content-Type: application/octet-stream