有这个样本记录,100,1:2:3
我想标准化为
100,1
100,2
100,3
我的一位同事编写了一个猪脚本来实现这一点,而我的 MapReduce 代码花费了更多时间。我之前使用的是默认的 TextInputformat。但是为了提高性能,我决定编写一个自定义的输入格式类,带有一个自定义的 RecordReader。以 LineRecordReader 类为参考,我尝试编写以下代码。
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
import com.normalize.util.Splitter;
public class NormalRecordReader extends RecordReader<Text, Text> {
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private Text key = null;
private Text value = null;
private Text line = null;
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
in = new LineReader(fileIn, job);
this.pos = start;
}
public boolean nextKeyValue() throws IOException {
int newSize = 0;
if (line == null) {
line = new Text();
}
while (pos < end) {
newSize = in.readLine(line);
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
// line too long. try again
System.out.println("Skipped line of size " + newSize + " at pos " + (pos - newSize));
}
Splitter splitter = new Splitter(line.toString(), ",");
List<String> split = splitter.split();
if (key == null) {
key = new Text();
}
key.set(split.get(0));
if (value == null) {
value = new Text();
}
value.set(split.get(1));
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
@Override
public Text getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
public synchronized void close() throws IOException {
if (in != null) {
in.close();
}
}
}
虽然这可行,但我没有看到任何性能改进。在这里,我打破了“,”的记录,并将 100 设置为键,将 1,2,3 设置为值。我只调用执行以下操作的映射器:
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
try {
Splitter splitter = new Splitter(value.toString(), ":");
List<String> splits = splitter.split();
for (String split : splits) {
context.write(key, new Text(split));
}
} catch (IndexOutOfBoundsException ibe) {
System.err.println(value + " is malformed.");
}
}
拆分器类用于拆分数据,因为我发现 String 的拆分器速度较慢。方法是:
public List<String> split() {
List<String> splitData = new ArrayList<String>();
int beginIndex = 0, endIndex = 0;
while(true) {
endIndex = dataToSplit.indexOf(delim, beginIndex);
if(endIndex == -1) {
splitData.add(dataToSplit.substring(beginIndex));
break;
}
splitData.add(dataToSplit.substring(beginIndex, endIndex));
beginIndex = endIndex + delimLength;
}
return splitData;
}
可以以任何方式改进代码吗?