1

当我申请ParDo.of(new ParDoFn())namedPCollectiontextInput,程序会抛出这个异常。但是我删除时程序是正常的.apply(ParDo.of(new ParDoFn()))

//SparkRunner

private static void testHadoop(Pipeline pipeline){
    Class<? extends FileInputFormat<LongWritable, Text>> inputFormatClass =
            (Class<? extends FileInputFormat<LongWritable, Text>>)
                    (Class<?>) TextInputFormat.class;
    @SuppressWarnings("unchecked")  //hdfs://localhost:9000
            HadoopIO.Read.Bound<LongWritable, Text> readPTransfom_1 = HadoopIO.Read.from("hdfs://localhost:9000/tmp/kinglear.txt",
            inputFormatClass,
            LongWritable.class,
            Text.class);
    PCollection<KV<LongWritable, Text>> textInput = pipeline.apply(readPTransfom_1)
            .setCoder(KvCoder.of(WritableCoder.of(LongWritable.class), WritableCoder.of(Text.class)));

    //OutputFormat
    @SuppressWarnings("unchecked")
    Class<? extends FileOutputFormat<LongWritable, Text>> outputFormatClass =
            (Class<? extends FileOutputFormat<LongWritable, Text>>)
                    (Class<?>) TemplatedTextOutputFormat.class;

    @SuppressWarnings("unchecked")
    HadoopIO.Write.Bound<LongWritable, Text> writePTransform = HadoopIO.Write.to("hdfs://localhost:9000/tmp/output", outputFormatClass, LongWritable.class, Text.class);

    textInput.apply(ParDo.of(new ParDoFn())).apply(writePTransform.withoutSharding());

    pipeline.run().waitUntilFinish();

}
4

2 回答 2

3

您在哪个 Spark 版本上运行?根据我的经验,您遇到的错误是由 Spark 2.x AccumulatorV2 引发的,Spark runner 当前支持 Spark 1.6。

于 2017-02-21T07:50:48.113 回答
3

当我创建扩展的自定义累加器时,我遇到了类似的问题org.apache.spark.util.AccumulatorV2。原因是 override def isZero: Boolean方法逻辑不正确。所以基本上当你copyAndReset默认调用它的方法copy()时,reset()isZero()应该返回true。如果您查看 AccumulatorV2 源代码,检查的位置是:

// Called by Java when serializing an object
final protected def writeReplace(): Any = {
 if (atDriverSide) {
   if (!isRegistered) {
     throw new UnsupportedOperationException(
       "Accumulator must be registered before send to executor")
   }
   val copyAcc = copyAndReset()
   assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
   copyAcc.metadata = metadata
   copyAcc
 } else {
   this
 }
}

特别是这部分

 val copyAcc = copyAndReset()
 assert(copyAcc.isZero, "copyAndReset must return a zero value copy")

希望能帮助到你。快乐的火花!

于 2017-09-09T00:45:26.357 回答