所以我遇到了同样的问题,经过一些调查和时间,我想出了以下可行的解决方法。
So the problem is due to internal implementation of sequence file creation and the fact that it is using the file length which is updated per block of 64 MBs.
So I created the following class to create the reader and I wrapped the hadoop FS with my own while I overriding the get length method to return the file length instead:
public class SequenceFileUtil {
public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException {
WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));
return new SequenceFile.Reader(fileSystem, path, conf);
}
private class WrappedFileSystem extends FileSystem
{
private final FileSystem nestedFs;
public WrappedFileSystem(FileSystem fs){
this.nestedFs = fs;
}
@Override
public URI getUri() {
return nestedFs.getUri();
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return nestedFs.open(f,bufferSize);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
return nestedFs.append(f, bufferSize, progress);
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return nestedFs.rename(src, dst);
}
@Override
public boolean delete(Path path) throws IOException {
return nestedFs.delete(path);
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return nestedFs.delete(f, recursive);
}
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
return nestedFs.listStatus(f);
}
@Override
public void setWorkingDirectory(Path new_dir) {
nestedFs.setWorkingDirectory(new_dir);
}
@Override
public Path getWorkingDirectory() {
return nestedFs.getWorkingDirectory();
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return nestedFs.mkdirs(f, permission);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return nestedFs.getFileStatus(f);
}
@Override
public long getLength(Path f) throws IOException {
DFSClient.DFSInputStream open = new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
long fileLength = open.getFileLength();
long length = nestedFs.getLength(f);
if (length < fileLength){
//We might have uncompleted blocks
return fileLength;
}
return length;
}
}
}