0

我正在构建一个示例数据流管道,主要基于 https://cloud.google.com/dataflow/java-sdk/combine上的代码

但是当我运行我的代码时,我遇到了以下异常:

线程“主”java.lang.IllegalArgumentException 中的异常:无法在 com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils) 处序列化 com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$TestCombineDoFn@139982de .java:51) 在 com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:81) 在 com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensureSerializable(DirectPipelineRunner.java :784) 在 com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1025) 在 com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:963) 在com.google.cloud.dataflow.sdk.transforms.ParDo.access$000(ParDo.java:441) 在 com.google.cloud.dataflow.sdk.transforms.ParDo$1。com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:946) 上的评估(ParDo.java:951) com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform (DirectPipelineRunner.java:611) 在 com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200) 在 com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java :196) 在 com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196) 在 com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196) 在com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196) 在 com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:109) 在 com.google。云。dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:204) 在 com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:584) 在 com.google.cloud.dataflow.sdk。 com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:70) 在 com.google.cloud.dataflow.sdk.Pipeline.run( Pipeline.java:145) 在 com.google.cloud.dataflow.examples.CalcMeanExample.main(CalcMeanExample.java:50) 原因:java.io.NotSerializableException: org.apache.avro.io.DecoderFactory 在 java.io。 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 中的 ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream .writeObject(ObjectOutputStream.java:348) 在 com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:47) ... 20 更多writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java :1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray( SerializableUtils.java:47) ... 20 更多writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java :1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 在 com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray( SerializableUtils.java:47) ... 20 更多1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils) .java:47) ... 20 更多1432) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 在 com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils) .java:47) ... 20 更多

我的代码如下:

package com.google.cloud.dataflow.examples;

import java.io.Serializable;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.PCollection;


public class CalcMeanExample 

{

public static void main(String[] args) 
{
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> numbers = p.apply(TextIO.Read.named("ReadLines").withCoder(StringUtf8Coder.of()).from(options.getInput()));

    numbers.apply( ParDo.of( new DoFn<String,String>(){
        @Override
        public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {

            System.out.println( c.element() );

        }
    }));

    PCollection<String> average = numbers.apply( Combine.globally( new AverageFn()));


    average.apply(TextIO.Write.named("WriteAverage")
            .to(options.getOutput())
            .withNumShards(options.getNumShards()));

    p.run();

    System.out.println( "done" );
}


public static class AverageFn extends CombineFn<String, AverageFn.Accum, String> {
        @DefaultCoder(AvroCoder.class)   
        public static class Accum implements Serializable {
         int sum = 0;
         int count = 0;
       }
       public Accum createAccumulator() { return new Accum(); }
       public void addInput(Accum accum, String input) {
           accum.sum += Integer.parseInt(input );
           accum.count++;
       }
       public Accum mergeAccumulators(Iterable<Accum> accums) {
         Accum merged = createAccumulator();
         for (Accum accum : accums) {
           merged.sum += accum.sum;
           merged.count += accum.count;
         }
         return merged;
       }
       public String extractOutput(Accum accum) {
         return Double.toString( ((double) accum.sum) / accum.count );
       }
     }



  /**
   * Options supported by {@link WordCount}.
   * <p>
   * Inherits standard configuration options.
   */
  public static interface Options extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInput();
    void setInput(String value);

    @Description("Path of the file to write to")
    @Default.InstanceFactory(OutputFactory.class)
    String getOutput();
    void setOutput(String value);

    /**
     * Returns gs://${STAGING_LOCATION}/"sorts.txt" as the default destination.
     */
    public static class OutputFactory implements DefaultValueFactory<String> {
      @Override
      public String create(PipelineOptions options) {
        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
        if (dataflowOptions.getStagingLocation() != null) {
          return GcsPath.fromUri(dataflowOptions.getStagingLocation())
              .resolve("sorts.txt").toString();
        } else {
          throw new IllegalArgumentException("Must specify --output or --stagingLocation");
        }
      }
    }

     /**
     * By default (numShards == 0), the system will choose the shard count.
     * Most programs will not need this option.
     */
    @Description("Number of output shards (0 if the system should choose automatically)")
    @Default.Integer(1)
    int getNumShards();
    void setNumShards(int value);
  }     

}

有什么想法会导致这种情况吗?

4

1 回答 1

1

我们已经意识到这个问题,并且正在努力修复应该很快就会提供的修复。

现在,您应该能够使用 SerializableCoder 而不是 AvroCoder 作为累加器。

于 2015-02-06T23:51:16.820 回答