0

我正在尝试逐行读取hdfs文件,然后创建一个hdfs文件并逐行写入。我使用的代码如下所示:

            Path FileToRead=new Path(inputPath);
        FileSystem hdfs = FileToRead.getFileSystem(new Configuration());            
        FSDataInputStream fis = hdfs.open(FileToRead);
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis));

        String line;
            line = reader.readLine(); 
            while (line != null){

                String[] lineElem = line.split(",");
                for(int i=0;i<10;i++){

                    MyMatrix[i][Integer.valueOf(lineElem[0])-1] = Double.valueOf(lineElem[i+1]);
                }

                line=reader.readLine();
        } 

        reader.close();
        fis.close();


        Path FileToWrite = new Path(outputPath+"/V"); 
        FileSystem fs = FileSystem.get(new Configuration());
        FSDataOutputStream fileOut = fs.create(FileToWrite);
        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOut));
        writer.write("check");
        writer.close();
        fileOut.close();

当我在 outputPath 文件中运行此代码时,尚未创建 V。但是,如果我将读取部分替换为写入部分,则会创建文件并将检查写入其中。谁能帮我理解如何正确使用它们以便能够先读取整个文件然后逐行写入文件?

我还尝试了另一种代码来读取一个文件并写入另一个文件,但该文件将被创建但没有写入任何内容!

我这样使用:

  hadoop jar main.jar program2.Main input output

然后在我的第一份工作中,我使用 map reduce 类从 arg[0] 读取并写入 args[1]+"/NewV" 中的文件,并且它可以工作。在我的其他类(非 map reduce)中,我使用 args[1]+"/NewV" 作为输入路径,使用 output+"/V_0" 作为输出路径(我将这些字符串传递给构造函数)。这是该类的代码:

 public class Init_V {

String inputPath, outputPath;


public Init_V(String inputPath, String outputPath) throws Exception {

    this.inputPath = inputPath;
    this.outputPath = outputPath;


    try{            

        FileSystem fs = FileSystem.get(new Configuration());
        Path FileToWrite = new Path(outputPath+"/V.txt"); 
        Path FileToRead=new Path(inputPath);
        BufferedWriter output = new BufferedWriter
         (new OutputStreamWriter(fs.create(FileToWrite,
                 true)));  

        BufferedReader reader = new
            BufferedReader(new InputStreamReader(fs.open(FileToRead)));
                 String data;
                 data = reader.readLine();
                 while ( data != null ) 
                 {
                     output.write(data);
                     data = reader.readLine();
                 }
                 reader.close();                     
                 output.close(); }catch(Exception e){
}

}

}
4

1 回答 1

1

我认为,您需要了解 hadoop 如何正常工作。在hadoop中,很多事情都是由系统完成的,你只是给出输入和输出路径,然后如果路径有效,它们就会被hadoop打开和创建。检查以下示例;

public int run (String[] args) throws Exception{

    if(args.length != 3){
        System.err.println("Usage: MapReduce <input path> <output path> ");
        ToolRunner.printGenericCommandUsage(System.err);
    }
    Job job = new Job();
    job.setJarByClass(MyClass.class);
    job.setNumReduceTasks(5);
    job.setJobName("myclass");
    FileInputFormat.addInputPath(job, new Path(args[0]) );
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    return job.waitForCompletion(true) ? 0:1 ;
}


/* ----------------------main---------------------*/
public static void main(String[] args) throws Exception{    

    int exitCode = ToolRunner.run(new MyClass(), args);
    System.exit(exitCode);
}

正如您在此处看到的,您只需初始化必要的变量,而读写是由 hadoop 完成的。

此外,在您的Mapper 类中,您在 map 中说context.write(key, value),同样在您的Reduce 类中,您正在做同样的事情,它为您编写。

如果您使用 BufferedWriter/Reader,它将写入您的本地文件系统而不是 HDFS。要查看 HDFS 中的文件,您应该编写hadoop fs -ls <path>,您通过ls命令查看的文件位于本地文件系统中

编辑:为了使用读/写,您应该了解以下内容:假设您的 hadoop 网络中有 N 台机器。当你想读的时候,你将不知道哪个映射器在读,同样在写。所以,所有的映射器和减速器都应该有那些不给出异常的路径。

我不知道您是否可以使用任何其他类,但您可以出于特定原因使用两种方法:startupcleanup. 这些方法在每个 map 和 reduce worker 中只使用一次。因此,如果您想读写,可以使用该文件。读写与普通java代码相同。例如,您想查看每个键的内容,并希望将其写入 txt。您可以执行以下操作:

//in reducer
BufferedReader bw ..;

void startup(...){
     bw  = new ....;
}

void reduce(...){
    while(iter.hasNext()){ ....;
    }
    bw.write(key, ...);
}
void cleanup(...){
    bw.close();
}
于 2013-05-12T19:39:52.037 回答