我想为 protobufs 创建通用解析器并在 GCP 的 Dataflow 中使用它。我已经读过我需要 Parser 类,我将它作为参数传递。
这是我写的代码:
public static DoFn<PubsubMessage, List<TableRow>> ProtoTransformation (Parser parser, Schema schema) {
return new DoFn<PubsubMessage, List<TableRow>>() {
@ProcessElement
public void processElement(@Element PubsubMessage input, OutputReceiver<List<TableRow>> receiver, ProcessContext c) {
List<TableRow> rows = new ArrayList<>();
byte[] data = input.getPayload();
Object result;
try {
result = parser.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
String exceptionAsString = sw.toString();
LOG.error("Could not load PROTO data " + exceptionAsString);
receiver.output(rows);
}
}
};
}
我想这样使用它:
.apply(name, ParDo.of(ProtoClassTransformation.ProtoTransformation(parser, searchResultsSchema)))
问题是,当我想调用 parseFrom 方法时,它会抛出java.io.NotSerializableException
。我尝试使用标记ProtoTransformation
,@Transient
但它不起作用。
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize DoFnAndMainOutput{doFn=parsers.ProtoClassTransformation$1@75f2099, mainOutputTag=Tag<output>}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:462)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:160)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:695)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:156)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$ParDoPayloadTranslator.translate(ParDoTranslation.java:111)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:547)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:557)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at FlightMessageToBigQuery.main(FlightMessageToBigQuery.java:100)
Caused by: java.io.NotSerializableException: searchresultsproto.SearchResultProto$SearchResult$1
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
... 23 more