由于对其他人有帮助,使用上面的提示,我能够进行实施:
CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}
'getRecordWriter' 的内置实现中只有一行更改为:
String keyValueSeparator = job.get("mapred.textoutputformat.separator", "");
代替:
String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t");
在将其编译成 Jar 并将其包含到我的 hadoop 流式调用中(通过 hadoop 流式处理的说明)后,调用看起来像:
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \
-archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
-libjars theNameOfTheJar.jar \
-outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat \
-file yourMapper.py -mapper yourMapper.py \
-file yourReducer.py -reducer yourReducer.py \
-input $yourInputFile \
-output $yourOutputDirectoryOnHDFS
我还将 jar 包含在我发出该调用的文件夹中。
它非常适合我的需求(并且它在减速器之后的行尾没有创建标签)。
更新:基于暗示这确实对其他人有帮助的评论,这是我的 CustomOutputFormat.java 文件的完整来源:
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
Progressable progress) throws IOException {
boolean isCompressed = getCompressOutput(job);
//Channging the default from '\t' to blank
String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class);
// create the named codec
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
// build the filename including the extension
Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter<K, V>(new DataOutputStream(
codec.createOutputStream(fileOut)), keyValueSeparator);
}
}
}
仅供参考:对于您的使用上下文,请务必检查这不会对映射器和减速器之间的 hadoop-streaming 托管交互(在分离键与值方面)产生不利影响。澄清:
根据我的测试——如果你的数据的每一行都有一个“标签”(每边都有一些东西),你可以保留内置的默认值:流式传输会将第一个标签之前的第一件事解释为你的“关键”,以及它之后的那一行作为你的“价值”。因此,它看不到“空值”,也不会附加显示在减速器之后的选项卡。(您会看到最终输出按“键”的值排序,流在每一行中解释为它看到的每个选项卡之前发生的内容。)
相反,如果您的数据中没有选项卡,并且您没有使用上述技巧覆盖默认值,那么您将在运行完成后看到选项卡,上述覆盖成为修复。