3

我想为 protobufs 创建通用解析器并在 GCP 的 Dataflow 中使用它。我已经读过我需要 Parser 类,我将它作为参数传递。

这是我写的代码:

public static DoFn<PubsubMessage, List<TableRow>> ProtoTransformation (Parser parser, Schema schema) {
        return new DoFn<PubsubMessage, List<TableRow>>() {
            @ProcessElement
            public void processElement(@Element PubsubMessage input, OutputReceiver<List<TableRow>> receiver, ProcessContext c) {
                List<TableRow> rows = new ArrayList<>();
                byte[] data = input.getPayload();
                Object result;
                try {
                    result = parser.parseFrom(data);
                } catch (InvalidProtocolBufferException e) {
                    StringWriter sw = new StringWriter();
                    e.printStackTrace(new PrintWriter(sw));
                    String exceptionAsString = sw.toString();
                    LOG.error("Could not load PROTO data " + exceptionAsString);

                    receiver.output(rows);
                }
            }
  };
}

我想这样使用它:

.apply(name, ParDo.of(ProtoClassTransformation.ProtoTransformation(parser, searchResultsSchema)))

问题是,当我想调用 parseFrom 方法时,它会抛出java.io.NotSerializableException。我尝试使用标记ProtoTransformation@Transient但它不起作用。

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnAndMainOutput{doFn=parsers.ProtoClassTransformation$1@75f2099, mainOutputTag=Tag<output>}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:462)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:160)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:695)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:156)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$ParDoPayloadTranslator.translate(ParDoTranslation.java:111)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:547)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:557)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at FlightMessageToBigQuery.main(FlightMessageToBigQuery.java:100)
Caused by: java.io.NotSerializableException: searchresultsproto.SearchResultProto$SearchResult$1
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    ... 23 more
4

0 回答 0