2

I am looking at the documentation and the provided examples to find out how I can report invalid data while processing data with Google's dataflow service.

Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
 .apply(new SomeTransformation())
 .apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
p.run();

In addition to the actual in-/output, I want to produce a 2nd output file that contains records that which are considered invalid (e.g. missing data, malformed data, values were too high). I want to troubleshoot those records and process them separately.

  • Input: gs://.../input.csv
  • Output: gs://.../output.csv
  • List of invalid records: gs://.../invalid.csv

How can I redirect those invalid records into a separate output?

4

2 回答 2

3

您可以使用PCollectionTuples从单个转换中返回多个 PCollection。例如,

TupleTag<String> mainOutput = new TupleTag<>("main");
TupleTag<String> missingData = new TupleTag<>("missing");
TupleTag<String> badValues = new TupleTag<>("bad");

Pipeline p = Pipeline.create(options);
PCollectionTuple all = p
   .apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
   .apply(new SomeTransformation());

all.get(mainOutput)
   .apply(TextIO.Write.named("WriteMyFile").to(options.getOutput()));
all.get(missingData)
   .apply(TextIO.Write.named("WriteMissingData").to(...));
...

PCollectionTuples 既可以直接从现有的 PCollection 中构建,也可以从 ParDo 操作中发出,带有侧输出,例如

PCollectionTuple partitioned = input.apply(ParDo
    .of(new DoFn<String, String>() {
          public void processElement(ProcessContext c) {
             if (checkOK(c.element()) {
                 // Shows up in partitioned.get(mainOutput).
                 c.output(...);
             } else if (hasMissingData(c.element())) {
                 // Shows up in partitioned.get(missingData).
                 c.sideOutput(missingData, c.element());
             } else {
                 // Shows up in partitioned.get(badValues).
                 c.sideOutput(badValues, c.element());
             }
          }
        })
    .withOutputTags(mainOutput, TupleTagList.of(missingData).and(badValues)));

请注意,通常各种侧输出不需要具有相同的类型,并且可以将数据发送到任意数量的侧输出(而不是我们这里的严格分区)。

您的 SomeTransformation 类可能看起来像

class SomeTransformation extends PTransform<PCollection<String>,
                                            PCollectionTuple> {
  public PCollectionTuple apply(PCollection<String> input) {
    // Filter into good and bad data.
    PCollectionTuple partitioned = ...
    // Process the good data.
    PCollection<String> processed =
        partitioned.get(mainOutput)
                   .apply(...)
                   .apply(...)
                   ...;
    // Repackage everything into a new output tuple.
    return PCollectionTuple.of(mainOutput, processed)
                           .and(missingData, partitioned.get(missingData))
                           .and(badValues, partitioned.get(badValues));
  }
}
于 2015-01-02T21:46:43.453 回答
0

Robert's suggestion of using sideOutputs is great, but note that this will only work if the bad data is identified by your ParDos. There currently isn't a way to identify bad records hit during initial decoding (where the error is hit in Coder.decode). We've got plans to work on that soon.

于 2015-01-03T00:29:38.020 回答