0

我需要从 GCS 中压缩的文件中解析 json 数据,因为文件扩展名为 .gz,因此它应该由数据流重新组织和正确处理,但是作业日志打印出不可读的字符和未处理的数据。当我处理未压缩的数据时,它工作正常。我使用以下方法来映射/解析 json:

        ObjectMapper mapper = new ObjectMapper();
        Map<String, String> eventDetails = mapper.readValue(c.element(),
                    new TypeReference<Map<String, String>>() {
                    });

知道可能是什么原因吗?

====================================

要添加有关如何从输入文件中读取的更多详细信息:

  1. 创建管道:

    Poptions pOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(Poptions.class);
    Pipeline p = Pipeline.create(pOptions);
    p.apply(TextIO.Read.named("ReadLines").from(pOptions.getInput()))                                          
     .apply(new Pimpression())
     .apply(BigQueryIO.Write
    .to(pOptions.getOutput())
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    p.run();
    
  2. 运行时配置:

    PROJECT="myProjectId"
    DATASET="myDataSetId"
    INPUT="gs://foldername/input/*"
    STAGING1="gs://foldername/staging" 
    TABLE1="myTableName"
    mvn exec:java -pl example \
    -Dexec.mainClass=com.google.cloud.dataflow.examples.Example1 \
    -Dexec.args="--project=${PROJECT} --output=${PROJECT}:${DATASET}.${TABLE1}   --input=${INPUT} --stagingLocation=${STAGING1} --runner=BlockingDataflowPipelineRunner"
    
  3. 输入文件名示例:file.gz,以及命令 gsutil ls -L gs://bucket/input/file.gz 的输出 | grep 内容- 是:

    Content-Length:     483100
    Content-Type:       application/octet-stream
    
4

1 回答 1

1

在私下跟进后,我们确定此问题是由于使用了旧版本的 Dataflow SDK(支持 gzip 之前)造成的。由于 Dataflow 处于 alpha 阶段,并且 SDK 不断更新,因此请确保您使用的 SDK 版本是最新的(来自 Maven 中心或 GitHub)。

于 2015-03-16T02:10:48.637 回答