3

我的代码如下。这是级联代码。它有8个工作。我不知道如何配置每个作业。下面的代码一起配置了 8 个作业。但我想做的是让最后一项工作减少。我想问如何识别这8个工作,以及如何分别配置它们?谢谢。

private static void Demo(String[] args) {
/*      Tap sourceTap = new Hfs(new TextLine(), "D:/test/file");
        Tap finalResultTap = new Hfs(new TextLine(), "D:/test/result", true);
*/
        Tap sourceTap = new Hfs(new TextLine(), args[0], SinkMode.KEEP);
        Tap finalResultTap = new Hfs(new TextLine(), args[1], SinkMode.REPLACE);
        Tap trap = new Hfs(new TextLine(), args[2], SinkMode.REPLACE);


        Pipe sourcePipe = new Pipe("sourcePipe");
        sourcePipe = getFilterPipe(sourcePipe);

        Pipe vvResultPipe = new Pipe("vvResultPipe", sourcePipe);
        vvResultPipe = getVVResultPipe(sourcePipe);

        Pipe clickResultPipe = new Pipe("clickResultPipe", sourcePipe);
        clickResultPipe = getClickResultPipe(clickResultPipe);

        Pipe stClickResultPipe = new Pipe("stClickResultPipe", sourcePipe);
        stClickResultPipe = getStClickResultPipe(sourcePipe);

        //连接3个pipe的结果
        Pipe resultPipe = new Pipe("resultPipe");
        resultPipe = new CoGroup(vvResultPipe, new Fields("vid"), clickResultPipe, new Fields("referVid"), 
                new Fields("vid", "totalVV", "referVid", "totalClick"), new LeftJoin());
        resultPipe = new CoGroup(resultPipe, new Fields("vid"), stClickResultPipe, new Fields("referVid"), 
                new Fields("vid", "totalVV", "referVid", "totalClick", "referVid2", "st1","st2","st3", "st4","st6", "st8"), new LeftJoin());
        resultPipe = new Each(resultPipe, new Fields("vid", "totalVV", "totalClick", "st1","st2","st3", "st4","st6", "st8"),
                new Identity(Fields.ARGS));

        Fields sortClickFields = new Fields("totalVV"); 
        resultPipe = new GroupBy(resultPipe, Fields.NONE, sortClickFields );
        sortClickFields.setComparators(Collections.reverseOrder());

/*      Limit limit = new Limit(200);
        resultPipe = new Each(resultPipe, limit);
*/

        JobConf conf = new JobConf();
        conf.setJarByClass(Main.class);

        //Properties properties = new Properties();
        Properties properties = AppProps.appProps().buildProperties(conf);
        properties.setProperty("user.group", "d_sdo_data");
        properties.setProperty("mapred.job.queue.name", "cug_d_sdo_data");
        properties.setProperty("mapred.fairscheduler.pool", "cug_d_sdo_data");
        properties.setProperty("cascading.tmp.dir", "/home/hdfs/cluster-data/tmp/mapred/staging/recommend_user/tmp");
        properties.setProperty("mapreduce.job.complete.cancel.delegation.tokens", "false");
        properties.setProperty("mapred.reduce.tasks", "30");
        properties.setProperty("mapred.map.tasks", "200");

        //AppProps.setApplicationJarClass(properties, Main.class);
        FlowConnector flowConnector = new HadoopFlowConnector(properties);
        FlowDef flowDef = FlowDef.flowDef()
                 .setName( "tfidf" )
                 .addSource( sourcePipe, sourceTap )
                 .addTailSink( resultPipe, finalResultTap )
                 .addTrap( "assertions", trap );
        Flow flow = flowConnector.connect(flowDef);
        flow.complete();
    }
4

1 回答 1

0

这个问题已经三年了,但我在寻找这个解决方案时遇到了它。这是我最终得出的结论: pipe.getStepConfigDef().setProperty("mapreduce.job.reduces", "1"); 这是在您定义了要配置的步骤之后进行的。

请注意,这适用于 Hadoop 2.6.4 - 如果您使用的是mapred.reduce.tasks属性。看这里:

https://hadoop.apache.org/docs/r2.6.4/hadoop-project-dist/hadoop-common/DeprecatedProperties.html

于 2017-01-04T16:03:05.433 回答