-1

这是我的代码片段

@Override
    protected RecordWriter<String, String> getBaseRecordWriter(
            FileSystem fs, JobConf job, String name, Progressable arg3)
                    throws IOException {
        Path file2 = FileOutputFormat.getOutputPath(job);
        String path = file2.toUri().getPath()+File.separator+ name;
        FSDataOutputStream fileOut = new FSDataOutputStream( new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
        return new LineRecordWriter<String, String>(fileOut, "\t");
    }

我正在使用 Spark 1.6.1,在我的代码中,我使用saveAsHadoopFile()了方法,为此我编写了一个派生自 org.apache.hadoop.mapred.lib.MultipleTextOutputFormat 的类 OutputFormat,并覆盖了上述方法。

在集群上,它会在输出文件中写入损坏的记录。BufferedOutputStream我认为这是 因为

FSDataOutputStream fileOut = new FSDataOutputStream(
                 new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);

我们可以有任何替代方案bufferedOutputStream,因为它会在缓冲区满时立即写入。

注意:更新了代码。带来不便敬请谅解。

4

1 回答 1

0

我遇到了问题 .. 在集群上,每个工作人员都会尝试写入相同的(共享)文件,因为不同机器上的两个工作人员意味着不同的 JVM,因此同步文件写入不会在这里工作。这就是腐败记录的原因。我也使用了 NFS,这是一个重要因素。

于 2016-10-25T11:55:51.113 回答