2

我想逐块(不是逐行)读取 Hadoop 中的大文件,其中每个块的大小接近 5 MB。为此,我编写了一个自定义recordreader. 但它给了我一个错误Premature EOF from inputStream,这是由 , 引起的nextKeyValue()readfully()而阅读。

这是我的代码:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<Text, apriori> {

public Text key = new Text("");
public apriori value = new apriori();
public Configuration job;
public FileSplit filesplit;
public FSDataInputStream in;
public Boolean processed = false;
public int len = 5000000;
public long filepointer = 0;
public int mapperFlag = 0;



public WholeFileRecordReader(FileSplit arg0, TaskAttemptContext arg1) {
    this.filesplit = arg0;
    this.job=arg1.getConfiguration();
}


@Override
public void close() throws IOException {

}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
}

@Override
public apriori getCurrentValue() throws IOException, InterruptedException {
        return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return processed ? 1.0f : 0.0f;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
        throws IOException, InterruptedException {
    this.job = arg1.getConfiguration();
    this.filesplit = (FileSplit)arg0;
    final Path file = filesplit.getPath();


    FileSystem fs = file.getFileSystem(job);
    in = fs.open(file);
    }

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if ((!processed)&&(filesplit.getLength()>filepointer)) {
        byte[] contents = new byte[ len];
        Path file = filesplit.getPath();
        key.set(file.getName());
        in.seek(filepointer);
        try {
            IOUtils.readFully(in, contents, 0, len);
            value.set(contents, 0, len);
                        } finally {
    //        IOUtils.closeStream(in);
        }
        filepointer = filepointer + len;
        processed = false;
        return true;
    }
    else if((!processed)&&(filesplit.getLength()<filepointer))
    {
        Path file = filesplit.getPath();
        key.set(file.getName());
        int last = (int)(filesplit.getLength()-(filepointer-len));
        byte[] contents = new byte[last];
        in.seek(filepointer-len);
        try {
            IOUtils.readFully(in, contents, 0, last);
            mapperFlag =1;
            value.set(contents, 0, last,mapperFlag);

        } finally {
            IOUtils.closeStream(in);
        }
        processed = true;
        return true;
    }

    return false;
 }
 }
4

0 回答 0