您可以像Guy提到的那样预处理您的输入,或者可以应用此处描述的其他技巧。
我认为最干净的解决方案是实现一个自定义InputFormat(连同它的 RecordReader),它创建一个记录/START-END。Pig 的LoadFunc位于 Hadoop 的 InputFormat 之上,因此您可以定义您的 LoadFunc 将使用的 InputFormat。
自定义 LoadFunc 的原始骨架实现如下所示:
import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class CustomLoader extends LoadFunc {
private RecordReader reader;
private TupleFactory tupleFactory;
public CustomLoader() {
tupleFactory = TupleFactory.getInstance();
}
@Override
public InputFormat getInputFormat() throws IOException {
return new MyInputFormat(); //custom InputFormat
}
@Override
public Tuple getNext() {
Tuple result = null;
try {
if (!reader.nextKeyValue()) {
return null;
}
//value can be a custom Writable containing your name/value
//field pairs for a given record
Object value = reader.getCurrentValue();
result = tupleFactory.newTuple();
// ...
//append fields to tuple
}
catch (Exception e) {
// ...
}
return result;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit pigSplit)
throws IOException {
this.reader = reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job, location);
}
}
在LoadFunc
初始化InputFormat
和 its之后RecordReader
,它会定位数据的输入位置并开始从 recordReader 获取记录,创建结果元组(getNext())直到输入被完全读取。
关于自定义 InputFormat 的一些说明:
我将创建一个自定义 InputFormat,其中 RecordReader 是以下的修改版本
org.apache.hadoop.mapreduce.lib.input.LineRecordReader
: 大多数方法将保持不变,除了initialize()
:它将调用自定义 LineReader(基于org.apache.hadoop.util.LineReader
)。InputFormat 的键是行偏移量(Long),值是自定义的 Writable。这会将记录的字段(即 START-END 之间的数据)保存为键值对列表。每次nextKeyValue()
调用 RecordReader 时,LineReader 都会将记录写入自定义 Writable。整个事情的要点是你如何实现 LineReader.readLine()
。
另一种可能更简单的方法是将 TextInputFormat 的分隔符(在 Hadoop 0.23 中可配置,请参阅 参考资料textinputformat.record.delimiter
)更改为适合您的数据结构的分隔符(如果可能的话)。在这种情况下,您最终将获得Text
需要从中拆分和提取 KV 对并放入元组的数据。