尝试DirectRunner
在 Scio/Beam 中运行管道时,出现以下错误:
java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=anonymous function map@{Foo.scala:59}:1, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:606)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:216)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:738)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:212)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:191)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:128)
at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:225)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:689)
at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:704)
at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:269)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:282)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:260)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
at com.spotify.scio.ScioContext.execute(ScioContext.scala:606)
at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:594)
at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:582)
at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:702)
at com.spotify.scio.ScioContext.run(ScioContext.scala:582)
代码如下:
object Foo {
...
scol.map { case (day, labeled, unlabeled) => writeFoo(gcpStorage, BlobId.of(bucket, path)) }
...
def writeFoo(gcpStorage: Storage, blobId: BlobId) {
...
}
不确定实际问题是什么。尝试内联writeFoo
但仍然无法正常工作。