1

我正在使用 Hadoop 1.0.3。

我将日志写入到 HDFS 的 Hadoop 序列文件中,在每组日志之后调用 syncFS() 但我从不关闭文件(除非我执行每日滚动)。

我要保证的是,在文件仍在写入时,该文件可供读者使用。

我可以通过 FSDataInputStream 读取序列文件的字节,但是如果我尝试使用 SequenceFile.Reader.next(key,val),它会在第一次调用时返回 false。

我知道数据在文件中,因为我可以使用 FSDataInputStream 或 cat 命令读取它,并且我 100% 确定调用了 syncFS()。

我检查了 namenode 和 datanode 日志,没有错误或警告。

为什么 SequenceFile.Reader 无法读取我当前正在写入的文件?

4

4 回答 4

3

您无法确保读取完全写入数据节点端的磁盘。您可以在以下状态的文档中看到这一点DFSClient#DFSOutputStream.sync()

  All data is written out to datanodes. It is not guaranteed that data has
  been flushed to persistent store on the datanode. Block allocations are
  persisted on namenode.

所以它基本上用当前信息更新namenode的块映射并将数据发送到datanode。由于您无法将数据刷新到数据节点上的磁盘,但您直接从数据节点读取数据,因此您遇到了数据在某处缓冲且无法访问的时间范围。因此,您的序列文件阅读器会认为数据流已完成(或为空)并且无法读取向反序列化过程返回 false 的其他字节。

如果完全接收到块,数据节点会将数据写入磁盘(它是预先写入的,但不能从外部读取)。因此,一旦达到您的块大小或您的文件已预先关闭并最终确定一个块,您就可以从文件中读取。这在分布式环境中完全有意义,因为您的编写器可能会死掉并且无法正确完成一个块——这是一个一致性问题。

所以解决方法是使块大小非常小,以便更频繁地完成块。但这不是那么有效,我希望清楚您的要求不适合 HDFS。

于 2013-01-17T12:30:29.893 回答
1

SequenceFile.Reader 无法读取正在写入的文件的原因是它使用文件长度来执行它的魔法。

写入第一个块时文件长度保持为 0,并且仅在块已满时更新(默认为 64MB)。然后文件大小停留在 64MB,直到第二个块被完全写入,依此类推......

这意味着您无法使用 SequenceFile.Reader 读取序列文件中最后一个不完整的块,即使可以直接使用 FSInputStream 读取原始数据。

关闭文件也修复了文件长度,但在我的情况下,我需要在文件关闭之前读取文件。

于 2013-01-31T13:57:34.737 回答
1

所以我遇到了同样的问题,经过一些调查和时间,我想出了以下可行的解决方法。

So the problem is due to internal implementation of sequence file creation and the fact that it is using the file length which is updated per block of 64 MBs.

So I created the following class to create the reader and I wrapped the hadoop FS with my own while I overriding the get length method to return the file length instead:

public class SequenceFileUtil {

    public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException {

        WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));

        return new SequenceFile.Reader(fileSystem, path, conf);
    }

    private class WrappedFileSystem extends FileSystem
    {
        private final FileSystem nestedFs;

        public WrappedFileSystem(FileSystem fs){
            this.nestedFs = fs;
        }

        @Override
        public URI getUri() {
            return nestedFs.getUri();
        }

        @Override
        public FSDataInputStream open(Path f, int bufferSize) throws IOException {
            return nestedFs.open(f,bufferSize);
        }

        @Override
        public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
            return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);
        }

        @Override
        public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
            return nestedFs.append(f, bufferSize, progress);
        }

        @Override
        public boolean rename(Path src, Path dst) throws IOException {
            return nestedFs.rename(src, dst);
        }

        @Override
        public boolean delete(Path path) throws IOException {
            return nestedFs.delete(path);
        }

        @Override
        public boolean delete(Path f, boolean recursive) throws IOException {
            return nestedFs.delete(f, recursive);
        }

        @Override
        public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
            return nestedFs.listStatus(f);
        }

        @Override
        public void setWorkingDirectory(Path new_dir) {
            nestedFs.setWorkingDirectory(new_dir);
        }

        @Override
        public Path getWorkingDirectory() {
            return nestedFs.getWorkingDirectory();
        }

        @Override
        public boolean mkdirs(Path f, FsPermission permission) throws IOException {
            return nestedFs.mkdirs(f, permission);
        }

        @Override
        public FileStatus getFileStatus(Path f) throws IOException {
            return nestedFs.getFileStatus(f);
        }


        @Override
        public long getLength(Path f) throws IOException {

            DFSClient.DFSInputStream open =  new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
            long fileLength = open.getFileLength();
            long length = nestedFs.getLength(f);

            if (length < fileLength){
                //We might have uncompleted blocks
                return fileLength;
            }

            return length;
        }


    }
}
于 2014-10-23T22:13:17.660 回答
-1

我遇到了类似的问题,这是我修复它的方法:http: //mail-archives.apache.org/mod_mbox/hadoop-common-user/201303.mbox/%3CCALtSBbY+LX6fiKutGsybS5oLXxZbVuN0WvW_a5JbExY98hJfig@mail.gmail.com%3E

于 2013-03-05T19:19:54.217 回答