我正在使用 Hadoop 1.0.3。

我将日志写入到 HDFS 的 Hadoop 序列文件中,在每组日志之后调用 syncFS() 但我从不关闭文件(除非我执行每日滚动)。


我可以通过 FSDataInputStream 读取序列文件的字节,但是如果我尝试使用 SequenceFile.Reader.next(key,val),它会在第一次调用时返回 false。

我知道数据在文件中,因为我可以使用 FSDataInputStream 或 cat 命令读取它,并且我 100% 确定调用了 syncFS()。

我检查了 namenode 和 datanode 日志,没有错误或警告。

为什么 SequenceFile.Reader 无法读取我当前正在写入的文件?


4 回答 4



  All data is written out to datanodes. It is not guaranteed that data has
  been flushed to persistent store on the datanode. Block allocations are
  persisted on namenode.

所以它基本上用当前信息更新namenode的块映射并将数据发送到datanode。由于您无法将数据刷新到数据节点上的磁盘,但您直接从数据节点读取数据,因此您遇到了数据在某处缓冲且无法访问的时间范围。因此,您的序列文件阅读器会认为数据流已完成(或为空)并且无法读取向反序列化过程返回 false 的其他字节。


所以解决方法是使块大小非常小,以便更频繁地完成块。但这不是那么有效,我希望清楚您的要求不适合 HDFS。

于 2013-01-17T12:30:29.893 回答

SequenceFile.Reader 无法读取正在写入的文件的原因是它使用文件长度来执行它的魔法。

写入第一个块时文件长度保持为 0,并且仅在块已满时更新(默认为 64MB)。然后文件大小停留在 64MB,直到第二个块被完全写入,依此类推......

这意味着您无法使用 SequenceFile.Reader 读取序列文件中最后一个不完整的块,即使可以直接使用 FSInputStream 读取原始数据。


于 2013-01-31T13:57:34.737 回答


So the problem is due to internal implementation of sequence file creation and the fact that it is using the file length which is updated per block of 64 MBs.

So I created the following class to create the reader and I wrapped the hadoop FS with my own while I overriding the get length method to return the file length instead:

public class SequenceFileUtil {

    public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException {

        WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));

        return new SequenceFile.Reader(fileSystem, path, conf);

    private class WrappedFileSystem extends FileSystem
        private final FileSystem nestedFs;

        public WrappedFileSystem(FileSystem fs){
            this.nestedFs = fs;

        public URI getUri() {
            return nestedFs.getUri();

        public FSDataInputStream open(Path f, int bufferSize) throws IOException {
            return nestedFs.open(f,bufferSize);

        public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
            return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);

        public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
            return nestedFs.append(f, bufferSize, progress);

        public boolean rename(Path src, Path dst) throws IOException {
            return nestedFs.rename(src, dst);

        public boolean delete(Path path) throws IOException {
            return nestedFs.delete(path);

        public boolean delete(Path f, boolean recursive) throws IOException {
            return nestedFs.delete(f, recursive);

        public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
            return nestedFs.listStatus(f);

        public void setWorkingDirectory(Path new_dir) {

        public Path getWorkingDirectory() {
            return nestedFs.getWorkingDirectory();

        public boolean mkdirs(Path f, FsPermission permission) throws IOException {
            return nestedFs.mkdirs(f, permission);

        public FileStatus getFileStatus(Path f) throws IOException {
            return nestedFs.getFileStatus(f);

        public long getLength(Path f) throws IOException {

            DFSClient.DFSInputStream open =  new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
            long fileLength = open.getFileLength();
            long length = nestedFs.getLength(f);

            if (length < fileLength){
                //We might have uncompleted blocks
                return fileLength;

            return length;

于 2014-10-23T22:13:17.660 回答

我遇到了类似的问题,这是我修复它的方法:http: //mail-archives.apache.org/mod_mbox/hadoop-common-user/201303.mbox/%3CCALtSBbY+LX6fiKutGsybS5oLXxZbVuN0WvW_a5JbExY98hJfig@mail.gmail.com%3E

于 2013-03-05T19:19:54.217 回答