0

我正在尝试编写一个 MapReduce 场景,其中我以 JSON 的形式创建了一些用户 ClickStream 数据。之后,我编写了 Mapper 类以从文件中获取所需的数据,我的映射器代码是:-

private final static String URL = "u";

private final static String Country_Code = "c";

private final static String Known_User = "nk";

private final static String Session_Start_time = "hc";

private final static String User_Id = "user";

private final static String Event_Id = "event";

public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String aJSONRecord = value.toString();
    try {
        JSONObject aJSONObject = new JSONObject(aJSONRecord);
        StringBuilder aOutputString = new StringBuilder();
        aOutputString.append(aJSONObject.get(User_Id).toString()+",");
        aOutputString.append(aJSONObject.get(Event_Id).toString()+",");
        aOutputString.append(aJSONObject.get(URL).toString()+",");
        aOutputString.append(aJSONObject.get(Known_User)+",");
        aOutputString.append(aJSONObject.get(Session_Start_time)+",");
        aOutputString.append(aJSONObject.get(Country_Code)+",");
        context.write(new Text(aOutputString.toString()), key);
        System.out.println(aOutputString.toString());
    } catch (JSONException e) {
        e.printStackTrace();
    }
}

}

我的减速器代码是:-

public void reduce(Text key, Iterable<LongWritable> values,
        Context context) throws IOException, InterruptedException {
        String aString =  key.toString();
        context.write(new Text(aString.trim()), new Text(""));  

}

我的分区代码是:-

public int getPartition(Text key, LongWritable value, int numPartitions) {
    String aRecord = key.toString();
    if(aRecord.contains(Country_code_Us)){
        return 0;
    }else{
        return 1;
    }
}

这是我的驱动程序代码

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Click Stream Analyzer");
    job.setNumReduceTasks(2);
    job.setJarByClass(ClickStreamDriver.class);
    job.setMapperClass(ClickStreamMapper.class);
    job.setReducerClass(ClickStreamReducer.class);
    job.setPartitionerClass(ClickStreamPartitioner.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

在这里,我试图根据国家代码划分我的数据。但它不起作用,它在一个减速器文件中发送每条记录,我认为是为美国减速器创建的文件以外的文件。

当我看到映射器的输出时,还有一件事,它显示在每条记录的末尾添加了一些额外的空间。

如果我在这里犯了任何错误,请提出建议。

4

3 回答 3

0

除非您有非常具体的要求,否则您可以为作业参数设置如下减速器。

mapred.reduce.tasks (in 1.x) & mapreduce.job.reduces(2.x)

或者

job.setNumReduceTasks(2)根据mark91的答案。

但是通过使用下面的 API 将工作留给 Hadoop 框架。框架将根据文件和块大小决定减速器的数量。

job.setPartitionerClass(HashPartitioner.class);
于 2015-11-25T10:20:20.607 回答
0

我使用了 NullWritable 并且它有效。现在我可以看到记录被划分在不同的文件中。由于我使用 longwritable 作为 null 值而不是 null writable ,因此在每行的最后添加了空格,并且由于这个 US 被列为“US”,并且分区无法划分订单。

于 2015-11-26T12:09:19.260 回答
0

您的分区问题是由于减速器的数量造成的。如果它是 1,那么您的所有数据都将发送给它,与您从分区器返回的内容无关。因此,设置mapred.reduce.tasks为 2 将解决此问题。或者你可以简单地写:

job.setNumReduceTasks(2);

为了有你想要的 2 个减速器。

于 2015-11-25T10:03:42.857 回答