我遇到了与此问题中提到的相同的问题(在将 Mapper 替换为 MultithreadMapper 时从地图中键入不匹配的键),但答案对我不起作用。
我收到的错误消息如下所示:
13/09/17 10:37:38 INFO mapred.JobClient: Task Id : attempt_201309170943_0006_m_000000_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1019)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
这是我的主要方法:
public static int main(String[] init_args) throws Exception {
Configuration config = new Configuration();
if (args.length != 5) {
System.out.println("Invalid Arguments");
print_usage();
throw new IllegalArgumentException();
}
config.set("myfirstdata", args[0]);
config.set("myseconddata", args[1]);
config.set("mythirddata", args[2]);
config.set("mykeyattribute", "GK");
config.setInt("myy", 50);
config.setInt("myx", 49);
// additional attributes
config.setInt("myobjectid", 1);
config.setInt("myplz", 3);
config.setInt("mygenm", 4);
config.setInt("mystnm", 6);
config.setInt("myhsnr", 7);
config.set("mapred.textoutputformat.separator", ";");
Job job = new Job(config);
job.setJobName("MySample");
// set the outputs for the Job
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// set the outputs for the Job
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
MultithreadedMapper.setMapperClass(job, MyMapper.class);
job.setReducerClass(MyReducer.class);
// In our case, the combiner is the same as the reducer. This is
// possible
// for reducers that are both commutative and associative
job.setCombinerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.setInputPaths(job, new Path(args[3]));
TextOutputFormat.setOutputPath(job, new Path(args[4]));
job.setJarByClass(MySampleDriver.class);
MultithreadedMapper.setNumberOfThreads(job, 2);
return job.waitForCompletion(true) ? 0 : 1;
}
映射器代码如下所示:
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
...
/**
* Sets up mapper with filter geometry provided as argument[0] to the jar
*/
@Override
public void setup(Context context) {
...
}
@Override
public void map(LongWritable key, Text val, Context context)
throws IOException, InterruptedException {
...
// We know that the first line of the CSV is just headers, so at byte
// offset 0 we can just return
if (key.get() == 0)
return;
String line = val.toString();
String[] values = line.split(";");
float latitude = Float.parseFloat(values[latitudeIndex]);
float longitude = Float.parseFloat(values[longitudeIndex]);
...
// Create our Point directly from longitude and latitude
Point point = new Point(longitude, latitude);
IntWritable one = new IntWritable();
if (...) {
int name = ...
one.set(name);
String out = ...
context.write(new Text(out), one);
} else {
String out = ...
context.write(new Text(out), new IntWritable(-1));
}
}
}