我正在尝试在独立的 HBase(0.94.11) 上执行 MR 代码。
我已经阅读了 HBase api 并修改了我的 MR 代码以从 HBase 表中读取数据和写入结果,并且我在 reduce 阶段遇到了异常。提供部分(不包括业务逻辑)代码
SentimentCalculatorHBase - 工具/主类:
package com.hbase.mapreduce;
import java.util.Calendar;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SentimentCalculatorHBase extends Configured implements Tool {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
SentimentCalculatorHBase sentimentCalculatorHBase = new SentimentCalculatorHBase();
ToolRunner.run(sentimentCalculatorHBase, args);
}
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
System.out
.println("***********************Configuration started***********************");
Configuration configuration = getConf();
System.out.println("Conf: " + configuration);
Job sentiCalcJob = new Job(configuration, "HBase SentimentCalculation");
sentiCalcJob.setJarByClass(SentimentCalculatorHBase.class);
sentiCalcJob.setMapperClass(SentimentCalculationHBaseMapper.class);
sentiCalcJob.setCombinerClass(SentimentCalculationHBaseReducer.class);
sentiCalcJob.setReducerClass(SentimentCalculationHBaseReducer.class);
sentiCalcJob.setInputFormatClass(TableInputFormat.class);
sentiCalcJob.setOutputFormatClass(TableOutputFormat.class);
/* Start : Added out of exasperation! */
sentiCalcJob.setOutputKeyClass(ImmutableBytesWritable.class);
sentiCalcJob.setOutputValueClass(Put.class);
/* End : Added out of exasperation! */
Scan twitterdataUserScan = new Scan();
twitterdataUserScan.setCaching(500);
twitterdataUserScan.addColumn("word_attributes".getBytes(),
"TwitterText".getBytes());
TableMapReduceUtil.initTableMapperJob("twitterdata_user",
twitterdataUserScan, SentimentCalculationHBaseMapper.class,
Text.class, Text.class, sentiCalcJob);
TableMapReduceUtil.initTableReducerJob("sentiment_output",
SentimentCalculationHBaseReducer.class, sentiCalcJob);
Calendar beforeJob = Calendar.getInstance();
System.out.println("Job Time started---------------- "
+ beforeJob.getTime());
boolean check = sentiCalcJob.waitForCompletion(true);
if (check == true) {
System.out
.println("*******************Job completed- SentimentCalculation********************");
}
Calendar afterJob = Calendar.getInstance();
System.out
.println("Job Time ended SentimentCalculation---------------- "
+ afterJob.getTime());
return 0;
}
}
映射器类:
public class SentimentCalculationHBaseMapper extends TableMapper<Text, Text> {
private Text sentenseOriginal = new Text();
private Text sentenseParsed = new Text();
@Override
protected void map(
ImmutableBytesWritable key,
Result value,
org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
throws IOException, InterruptedException {
context.write(this.sentenseOriginal, this.sentenseParsed);
}
}
减速机:
public class SentimentCalculationHBaseReducer extends
TableReducer<Text, Text, ImmutableBytesWritable> {
@Override
protected void reduce(
Text key,
java.lang.Iterable<Text> values,
org.apache.hadoop.mapreduce.Reducer<Text, Text, ImmutableBytesWritable, org.apache.hadoop.io.Writable>.Context context)
throws IOException, InterruptedException {
Double mdblSentimentOverall = 0.0;
String d3 = key + "@12321@" + s11.replaceFirst(":::", "")
+ "@12321@" + mstpositiveWords + "@12321@"
+ mstnegativeWords + "@12321@" + mstneutralWords;
System.out.println("d3 : " + d3 + " , mdblSentimentOverall : "
+ mdblSentimentOverall);
Put put = new Put(d3.getBytes());
put.add(Bytes.toBytes("word_attributes"),
Bytes.toBytes("mdblSentimentOverall"),
Bytes.toBytes(mdblSentimentOverall));
System.out.println("Context is " + context);
context.write(new ImmutableBytesWritable(d3.getBytes()), put);
}
}
我得到的例外是:
13/09/05 16:16:17 INFO mapred.JobClient: map 0% reduce 0%
13/09/05 16:23:31 INFO mapred.JobClient: Task Id : attempt_201309051437_0005_m_000000_0, Status : FAILED
java.io.IOException: wrong key class: class org.apache.hadoop.hbase.io.ImmutableBytesWritable is not class org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1168)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1492)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:199)
at com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1513)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1298)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:699)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:766)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
为了解决它,我更改了 reducer 的签名:
public class SentimentCalculationHBaseReducer extends
TableReducer<Text, Text, Text>{
@Override
protected void reduce(
Text key,
java.lang.Iterable<Text> values,
org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, org.apache.hadoop.io.Writable>.Context context)
throws IOException, InterruptedException {
context.write(new Text(d3.getBytes()), put);
}
}
但是后来我得到了一个错误,这次的值:
13/09/05 15:55:20 INFO mapred.JobClient: Task Id : attempt_201309051437_0004_m_000000_0, Status : FAILED
java.io.IOException: wrong value class: class org.apache.hadoop.hbase.client.Put is not class org.apache.hadoop.io.Text
无法弄清楚 HBase MR api 发生了什么问题!