这是我的代码片段
@Override
protected RecordWriter<String, String> getBaseRecordWriter(
FileSystem fs, JobConf job, String name, Progressable arg3)
throws IOException {
Path file2 = FileOutputFormat.getOutputPath(job);
String path = file2.toUri().getPath()+File.separator+ name;
FSDataOutputStream fileOut = new FSDataOutputStream( new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
return new LineRecordWriter<String, String>(fileOut, "\t");
}
我正在使用 Spark 1.6.1,在我的代码中,我使用saveAsHadoopFile()
了方法,为此我编写了一个派生自 org.apache.hadoop.mapred.lib.MultipleTextOutputFormat 的类 OutputFormat,并覆盖了上述方法。
在集群上,它会在输出文件中写入损坏的记录。BufferedOutputStream
我认为这是 因为
FSDataOutputStream fileOut = new FSDataOutputStream(
new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null);
我们可以有任何替代方案bufferedOutputStream
,因为它会在缓冲区满时立即写入。
注意:更新了代码。带来不便敬请谅解。