1

我有一个对象负责在 HDFS 上打开要写入的文件。close()一旦调用该方法,该对象就会重命名它刚刚编写的文件。该机制在本地模式下运行时有效,但在集群模式下无法重命名文件。

//Constructor
public WriteStream() {
    path = String.format("in_progress/file");
    try {
        OutputStream outputStream = fileSystem.create(new Path(hdfs_path+path), new Progressable() {public void progress() { System.out.print("."); }
            });
        writer = new BufferedWriter(new OutputStreamWriter(outputStream));
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public void close() {
    String newPath = String.format("%s_dir/%s_file", date, timestamp);
    try {
        fileSystem.rename(new Path(hdfs_path+path), new Path(hdfs_path+newPath));
        writer.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

你以前经历过吗?

4

2 回答 2

3

在本地模式下执行时,显然FileSystem.rename(Path)会在路径上创建丢失的目录,但在集群模式下运行时不会。此代码适用于两种模式:

public void close() {
    String dirPath = String.format("%s_dir/", date, timestamp);
    String newPath = String.format("%s_dir/%s_file", date, timestamp);
    try {
        fileSystem.mkdir(new Path(hdfs_path+dirPath));
        fileSystem.rename(new Path(hdfs_path+path), new Path(hdfs_path+newPath));
        writer.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}
于 2013-06-27T13:16:09.297 回答
1

只是好奇,但是你怎么能重命名一个正式不存在的文件(因为你当时还在写)?

解决方法是在文件完成后重命名。也就是说,当您调用 close 方法时。

所以你的代码应该是这样的:

public void close() {
    String newPath = String.format("%s_dir/%s_file", date, timestamp);
    try {
        writer.close();
        fileSystem.rename(new Path(hdfs_path+path), new Path(hdfs_path+newPath));
    } catch (IOException e) {
        e.printStackTrace();
    }
}
于 2013-06-27T09:30:50.877 回答