1

我有一个外部程序,它将文件作为输入并提供输出文件

     //for example 
     input file: IN_FILE
     output file: OUT_FILE

    //Run External program 
     ./vx < ${IN_FILE} > ${OUT_FILE}

我想要 HDFS 中的输入和输出文件

我有 8 个节点的集群。我有 8 个输入文件,每个文件有 1 行

    //1 input file :       1.txt 
           1:0,0,0
    //2 input file :       2.txt 
           2:0,0,128
    //3 input file :       3.txt 
           3:0,128,0
    //5 input file :       4.txt 
           4:0,128,128
    //5 input file :       5.txt 
           5:128,0,0
    //6 input file :       6.txt 
           6:128,0,128
    //7 input file :       7.txt 
           7:128,128,0
    //8 input file :       8.txt 
           8:128,128,128

我正在使用 KeyValueTextInputFormat

               key :file name
               value: initial coordinates

例如第 5 个文件

              key :5
              value:128,0,0

每个地图任务根据其初始坐标生成大量数据。

现在我想在每个地图任务中运行外部程序并生成输出文件。

但我很困惑如何处理 HDFS 中的文件。

         I can use zero reducer and create file in HDFS 

         Configuration conf = new Configuration();
         FileSystem fs = FileSystem.get(conf);
         Path outFile;
         outFile = new Path(INPUT_FILE_NAME);
         FSDataOutputStream out = fs.create(outFile);

         //generating data ........ and writing to HDFS 
          out.writeUTF(lon + ";" + lat + ";" + depth + ";");

我很困惑如何使用 HDFS 文件运行外部程序而不将文件放入本地目录。

  with  dfs -get 

在不使用 MR 的情况下,我使用 shell 脚本得到如下结果

#!/bin/bash

if [ $# -lt 2 ]; then
    printf "Usage: %s: <infile> <outfile> \n" $(basename $0) >&2
          exit 1
fi

IN_FILE=/Users/x34/data/$1
OUT_FILE=/Users/x34/data/$2                     

cd "/Users/x34/Projects/externalprogram/model/"

./vx < ${IN_FILE} > ${OUT_FILE}

paste ${IN_FILE} ${OUT_FILE} | awk '{print $1,"\t",$2,"\t",$3,"\t",$4,"\t",$5,"\t",$22,"\t",$23,"\t",$24}' > /Users/x34/data/combined
if [ $? -ne 0 ]; then
    exit 1
fi                      

exit 0

然后我运行它

         ProcessBuilder pb = new ProcessBuilder("SHELL_SCRIPT","in", "out"); 
         Process p = pb.start();

我非常感谢任何想法如何使用 hadoop 流或任何其他方式运行外部程序。我希望 HDFS 中的 INPUT 和 OUTPUT 文件进行进一步处理。

请帮忙

4

2 回答 2

0

因此,假设您的外部程序不知道如何识别或读取 hdfs,那么您要做的就是从 java 加载文件并将其作为输入直接传递给程序

Path path = new Path("hdfs/path/to/input/file");
FileSystem fs = FileSystem.get(configuration);
FSDataInputStream fin = fs.open(path);
ProcessBuilder pb = new ProcessBuilder("SHELL_SCRIPT");
Process p = pb.start();
OutputStream os = p.getOutputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(fin));
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(os));

String line = null;
while ((line = br.readLine())!=null){
    writer.write(line);
}

输出可以以相反的方式完成。从进程中获取 InputStream,并将 FSDataOutputStream 写入 hdfs。

从本质上讲,您的具有这两件事的程序变成了将 HDFS 转换为输入并将输出转换回 HDFS 的适配器。

于 2013-05-02T23:48:41.370 回答
0

您可以为此使用 Hadoop Streaming:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myPythonScript.py \
-reducer /bin/wc \
-file myPythonScript.py \
-file myDictionary.txt

有关一些示例,请参见https://hadoop.apache.org/docs/r1.0.4/streaming.pdf

还有一篇不错的文章: http: //princetonits.com/blog/technology/hadoop-mapreduce-streaming-using-bash-script/

Hadoop 流是 Hadoop 发行版附带的实用程序。该实用程序允许您使用任何可执行文件或脚本作为映射器和/或减速器来创建和运行 Map/Reduce 作业。

另一个例子:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc

在上面的示例中,mapper 和 reducer 都是可执行文件,它们从 stdin(逐行)读取输入并将输出发送到 stdout。该实用程序将创建一个 Map/Reduce 作业,将作业提交到适当的集群,并监控作业的进度,直到它完成。

当为映射器指定可执行文件时,每个映射器任务将在映射器初始化时将可执行文件作为单独的进程启动。当映射器任务运行时,它将其输入转换为行并将这些行提供给进程的标准输入。同时,映射器从进程的标准输出中收集面向行的输出,并将每一行转换为键/值对,作为映射器的输出收集。默认情况下,直到第一个制表符的行的前缀是键,该行的其余部分(不包括制表符)将是值。如果该行中没有制表符,则将整行视为键,值为空。但是,这可以自定义,如稍后讨论的那样。

当为 reducer 指定可执行文件时,每个 reducer 任务都会将可执行文件作为单独的进程启动,然后初始化 reducer。随着 reducer 任务的运行,它将其输入键/值对转换为行并将这些行提供给进程的标准输入。同时,reducer 从流程的 stdout 中收集面向行的输出,将每一行转换为键/值对,作为 reducer 的输出收集。默认情况下,到第一个制表符的行的前缀是键,行的其余部分(不包括制表符)是值。但是,这可以定制。

于 2017-08-16T00:09:47.827 回答