5

我在一个小文件(3-4 MB)上执行地图任务,但地图输出相对较大(150 MB)。显示地图 100% 后,需要很长时间才能完成溢出。请建议我怎样才能减少这个时间。以下是一些示例日志...

13/07/10 17:45:31 INFO mapred.MapTask: Starting flush of map output
13/07/10 17:45:32 INFO mapred.JobClient:  map 98% reduce 0%
13/07/10 17:45:34 INFO mapred.LocalJobRunner: 
13/07/10 17:45:35 INFO mapred.JobClient:  map 100% reduce 0%
13/07/10 17:45:37 INFO mapred.LocalJobRunner: 
13/07/10 17:45:40 INFO mapred.LocalJobRunner: 
13/07/10 17:45:43 INFO mapred.LocalJobRunner: 
13/07/10 17:45:46 INFO mapred.LocalJobRunner: 
13/07/10 17:45:49 INFO mapred.LocalJobRunner: 
13/07/10 17:45:52 INFO mapred.LocalJobRunner: 
13/07/10 17:45:55 INFO mapred.LocalJobRunner: 
13/07/10 17:45:58 INFO mapred.LocalJobRunner: 
13/07/10 17:46:01 INFO mapred.LocalJobRunner: 
13/07/10 17:46:04 INFO mapred.LocalJobRunner: 
13/07/10 17:46:07 INFO mapred.LocalJobRunner: 
13/07/10 17:46:10 INFO mapred.LocalJobRunner: 
13/07/10 17:46:13 INFO mapred.LocalJobRunner: 
13/07/10 17:46:16 INFO mapred.LocalJobRunner: 
13/07/10 17:46:19 INFO mapred.LocalJobRunner: 
13/07/10 17:46:22 INFO mapred.LocalJobRunner: 
13/07/10 17:46:25 INFO mapred.LocalJobRunner: 
13/07/10 17:46:28 INFO mapred.LocalJobRunner: 
13/07/10 17:46:31 INFO mapred.LocalJobRunner: 
13/07/10 17:46:34 INFO mapred.LocalJobRunner: 
13/07/10 17:46:37 INFO mapred.LocalJobRunner: 
13/07/10 17:46:40 INFO mapred.LocalJobRunner: 
13/07/10 17:46:43 INFO mapred.LocalJobRunner: 
13/07/10 17:46:46 INFO mapred.LocalJobRunner: 
13/07/10 17:46:49 INFO mapred.LocalJobRunner: 
13/07/10 17:46:52 INFO mapred.LocalJobRunner: 
13/07/10 17:46:55 INFO mapred.LocalJobRunner: 
13/07/10 17:46:58 INFO mapred.LocalJobRunner: 
13/07/10 17:47:01 INFO mapred.LocalJobRunner: 
13/07/10 17:47:04 INFO mapred.LocalJobRunner: 
13/07/10 17:47:07 INFO mapred.LocalJobRunner: 
13/07/10 17:47:10 INFO mapred.LocalJobRunner: 
13/07/10 17:47:13 INFO mapred.LocalJobRunner: 
13/07/10 17:47:16 INFO mapred.LocalJobRunner: 
13/07/10 17:47:19 INFO mapred.LocalJobRunner: 
13/07/10 17:47:22 INFO mapred.LocalJobRunner: 
13/07/10 17:47:25 INFO mapred.LocalJobRunner: 
13/07/10 17:47:28 INFO mapred.LocalJobRunner: 
13/07/10 17:47:31 INFO mapred.LocalJobRunner: 
13/07/10 17:47:34 INFO mapred.LocalJobRunner: 
13/07/10 17:47:37 INFO mapred.LocalJobRunner: 
13/07/10 17:47:40 INFO mapred.LocalJobRunner: 
13/07/10 17:47:43 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.MapTask: Finished spill 0
13/07/10 17:47:45 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
13/07/10 17:47:45 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done.
...............................
...............................
...............................
13/07/10 17:47:52 INFO mapred.JobClient: Counters: 22
13/07/10 17:47:52 INFO mapred.JobClient:   File Output Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:     Bytes Written=13401245
13/07/10 17:47:52 INFO mapred.JobClient:   FileSystemCounters
13/07/10 17:47:52 INFO mapred.JobClient:     FILE_BYTES_READ=18871098
13/07/10 17:47:52 INFO mapred.JobClient:     HDFS_BYTES_READ=7346566
13/07/10 17:47:52 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=35878426
13/07/10 17:47:52 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=18621307
13/07/10 17:47:52 INFO mapred.JobClient:   File Input Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:     Bytes Read=2558288
13/07/10 17:47:52 INFO mapred.JobClient:   Map-Reduce Framework
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce input groups=740000
13/07/10 17:47:52 INFO mapred.JobClient:     Map output materialized bytes=13320006
13/07/10 17:47:52 INFO mapred.JobClient:     Combine output records=740000
13/07/10 17:47:52 INFO mapred.JobClient:     Map input records=71040
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/07/10 17:47:52 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce output records=740000
13/07/10 17:47:52 INFO mapred.JobClient:     Spilled Records=1480000
13/07/10 17:47:52 INFO mapred.JobClient:     Map output bytes=119998400
13/07/10 17:47:52 INFO mapred.JobClient:     CPU time spent (ms)=0
13/07/10 17:47:52 INFO mapred.JobClient:     Total committed heap usage (bytes)=1178009600
13/07/10 17:47:52 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/07/10 17:47:52 INFO mapred.JobClient:     Combine input records=7499900
13/07/10 17:47:52 INFO mapred.JobClient:     Map output records=7499900
13/07/10 17:47:52 INFO mapred.JobClient:     SPLIT_RAW_BYTES=122
13/07/10 17:47:52 INFO mapred.JobClient:     Reduce input records=740000

地图任务源码:

public class GsMR2MapThree extends Mapper<Text, Text, LongWritable,DoubleWritable>{

    private DoubleWritable distGexpr = new DoubleWritable();
    private LongWritable m2keyOut = new LongWritable();
    int trMax,tstMax;

    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {

        Configuration conf =context.getConfiguration();
        tstMax = conf.getInt("mtst", 10);
        trMax = conf.getInt("mtr", 10);

    }

    public void map(Text key, Text values, Context context) throws IOException, InterruptedException {
        String line = values.toString();

        double Tij=0.0,TRij=0.0, dist=0;
        int i=0,j;
        long m2key=0;
        String[] SLl = new String[]{};

        Configuration conf =context.getConfiguration();

        m2key = Long.parseLong(key.toString());
        StringTokenizer tokenizer = new StringTokenizer(line);
        j=0;
        while (tokenizer.hasMoreTokens()) {

            String test = tokenizer.nextToken();
            if(j==0){
                Tij = Double.parseDouble(test);
            }
            else if(j==1){
                TRij = Double.parseDouble(test);
            }
            else if(j==2){
                SLl = StringUtils.split(conf.get(test),",");
            }
            j++;
        }
        //Map input ends

        //Distance Measure function
        dist = (long)Math.pow( (Tij - TRij), 2);

        //remove gid from key 
        m2key = m2key / 100000;
        //Map2 <key,value> emit starts
        for(i=0; i<SLl.length;i++){
               long m2keyNew = (Integer.parseInt(SLl[i])*(trMax*tstMax))+m2key;
            m2keyOut.set(m2keyNew);
            distGexpr.set(dist);
            context.write(m2keyOut,distGexpr);
        }
        //<key,value> emit done
    }

}

示例地图输入:每行中的最后一个变量从广播变量中获取一个整数数组。每行将产生大约 100-200 条输出记录。

10100014    1356.3238 1181.63 gs-4-56
10100026    3263.1167 3192.4131 gs-3-21
10100043    1852.0 1926.3962 gs-4-76
10100062    1175.5925 983.47125 gs-3-19
10100066    606.59125 976.26625 gs-8-23

示例地图输出:

10101   8633.0
10102   1822.0
10103   13832.0
10104   2726470.0
10105   1172991.0
10107   239367.0
10109   5410384.0
10111   7698352.0
10112   6.417
4

1 回答 1

0

我想您已经解决了这个问题(发布原始消息 2 年后),但是对于遇到相同问题的任何人,我会尝试提供一些建议。

从你的计数器来看,我知道你已经使用了压缩(因为映射输出物化字节的数量与映射输出字节的数量不同),这是一件好事。您可以通过使用可变长度编码的VLongWritable类作为映射输出键类型来进一步压缩映射器的输出。(如果我没记错的话,以前也有一个 VDoubleWritable 类,但它现在肯定已经被弃用了)。

在发出输出的 for 循环中,无需distGexpr每次都设置变量。它总是一样的,所以在 for 循环之前设置它。您还可以将 long 与trMax*tstMax循环外的乘积一起存储,而不是在每次迭代时计算它。

如果可能,使您的输入键 LongWritable(来自上一个作业),以便您可以保存Long.parseLong()Text.toString()调用。

如果可能(取决于您的减速器),请使用组合器来减少溢出字节的大小。

我找不到Integer.parseInt()在 for 循环中跳过该调用的方法,但如果您最初可以加载SLlint[].

于 2015-07-17T11:08:02.220 回答