0

如何为馈送到映射器的文件的每一行提供同一文件的拆分?

基本上我想做的是

for each line in file-split
{  

    for each line in file{     
             //process
    }

}

我可以在java中使用map reduce来做到这一点吗?

4

3 回答 3

0

实际上,当触发 mapreduce 作业时,它首先检查输入文件,为简单起见考虑我们只有一个大输入文件!如果它的大小大于块大小,则作业跟踪器将该文件按块大小拆分,然后启动No. of map tasks = No. of Splits生成并将每个拆分传递给每个映射器任务进行处理。因此,每个映射器不会处理超过一个拆分。此外,如果输入文件大小小于块大小,则jobtracker会将其作为单独的拆分。

假设块大小为 64MB,并且您有 2 个文件,每个文件大小为 10MB,那么 jobtracker 将生成 2 个拆分!因为根据FileInputFormat拆分可以是单个文件(如果文件大小 <= 块大小)或文件的一部分(如果它的大小>块大小)。

因此,映射器将只处理单个拆分,而且拆分不能包含多个文件(默认格式 FileInputFormat 为 true,但在组合文件输入格式的情况下,它可以跨越多个文件)。

我猜你正在使用 FilInputFormat。!

您可以参考Hadoop: The Definitive Guide来了解其基础知识。

于 2014-03-03T12:43:26.697 回答
0

您可以在 reducer 任务中获取文件的所有行。如果它解决了您的问题,请查看:

    public class FileLineComparison {

        public static class Map extends
                Mapper<LongWritable, Text, Text, Text> {
            private Text fileName = new Text();

            public void map(LongWritable key, Text line, Context context)
                    throws IOException, InterruptedException {// Parse the input string into a nice map
                /*
                 * get file name from context and put it as key,
                 * so that reducer will get all lines of that file
                             * from one or more mappers
                 */
                 FileSplit fileSplit = (FileSplit)context.getInputSplit();
                 fileName.set( fileSplit.getPath().getName());

                 context.write(fileName, line);


            }
        }

        public static class Reduce extends
                Reducer<Text, Text, Text, Text> {

                      public void reduce(Text filename, Iterable<Text> allLinesOfsinglefile,  Context context) throws IOException, InterruptedException {
                          for (Text val : allLinesOfsinglefile) {
                              /*
                               * you get each line of the file here.
                               * if you want to compare each line with the rest, please loop again.
But in that case consider it as an iterable object
                               * do your things here
                               */
                          }
                        /*
                         * write to out put file, if required  
                         */
                      context.write(filename, filename);
                      }
                  }
    }

或者如果你真的需要它在mapper中,请在每个mapper中读取文件本身,因为文件名和路径是我们从..它得到的。split仅在文件大小较小时推荐..

于 2014-03-04T07:39:09.650 回答
0

在这里你可以怎么做:

1)在Mapper.setup()中初始化一个字符串向量(如果你的分割太大,则初始化一个文件 - 分割大小通常是~输入 n HDFS 的块大小)。

2) 在Mapper.map()中读取行并将它们添加到向量中。

3)现在你在向量中有整个分裂。您是否在Mapper.cleanup()中进行处理:例如,您可以遍历循环,并将每一行作为键写入 reducer,将拆分的所有行作为值写入。

于 2014-03-03T13:37:13.573 回答