我有两个映射器类和两个减速器类。我希望流程是这样的。mapperOne--> ReducerOne-->MapperTwo-->ReducerTwo。
这是我的驱动程序类代码。
public class StockDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
System.out.println(" Driver invoked------");
Configuration config = new Configuration();
config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
config.set("mapred.textoutputformat.separator", " --> ");
String inputPath="In\\NYSE_daily_prices_Q_less.csv";
String outpath = "C:\\Users\\Desktop\\Hadoop\\run1";
String outpath2 = "C:\\Users\\Desktop\\Hadoop\\run2";
Job job1 = new Job(config,"Stock Analysis: Creating key values");
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(StockDetailsTuple.class);
job1.setMapperClass(StockMapperOne.class);
job1.setReducerClass(StockReducerOne.class);
FileInputFormat.setInputPaths(job1, new Path(inputPath));
SequenceFileOutputFormat.setOutputPath(job1, new Path(outpath));
//FileOutputFormat.setOutputPath(job1, new Path(outpath));
//THE SECOND MAP_REDUCE TO DO CALCULATIONS
Job job2 = new Job(config,"Stock Analysis: Calculating Covariance");
job2.setInputFormatClass(SequenceFileInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setMapperClass(StockMapperTwo.class);
job2.setReducerClass(StockReducerTwo.class);
SequenceFileInputFormat.setInputPaths(job2, new Path(outpath));
FileOutputFormat.setOutputPath(job2, new Path(outpath2));
System.out.println(job1.waitForCompletion(true));
System.out.println(job2.waitForCompletion(true));
}
}
我的 MapperOne 课程
public class StockMapperOne extends Mapper<LongWritable, Text, Text, StockDetailsTuple> {
private StockDetailsTuple stock= new StockDetailsTuple();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String valueString = value.toString();
String[] tokenArray = valueString.split(",");
String dateOfStock= tokenArray[2];
int month = Integer.parseInt(dateOfStock.substring(5, 7));
int year = Integer.parseInt(dateOfStock.substring(0,4));
stock.setStockDate(dateOfStock);
stock.setStockName(tokenArray[1]);
stock.setStockPrice(Float.parseFloat(tokenArray[4]));
stock.setMonthNum(month);
stock.setYear(year);
System.out.println(" Date of stock: "+dateOfStock + " stock: "+stock);
context.write(new Text(dateOfStock.trim()), stock);
}
}
REDUCER ONE CLASS(我将键、值对设置为 TEXT、TEXT)
public class StockReducerOne extends Reducer<Text, StockDetailsTuple, Text, Text> {
public void reduce(Text key, Iterable<StockDetailsTuple> values, Context context) throws IOException, InterruptedException{
for(StockDetailsTuple val: values){
System.out.println("VAL : "+ val);
priceMap.put(val.getStockName(),val.getStockPrice());
stockGroups = stockGroups.append(val.getStockName()).append(":");
month=val.getMonthNum();
count++;
}
if(count>1){
stockGroupAfterPermutations =doPermutation(stockGroups.toString());
formNewKey(context,this.stockGroupAfterPermutations, priceMap, month);
}
}
private void formNewKey(Context context,List<String> stockGroup, Map<String, Float> priceMap2,int month2)
throws IOException, InterruptedException {
System.out.println(" FORMING NEW KEY----------");
String tempKey=null,tempVal = null ;
for(String stkgrp:stockGroup){
String[] splitTokens = stkgrp.split(",");
//The below line gives the key as 3 QRR,QTM
// value as 12.22 13.33
Arrays.sort(splitTokens);
//System.out.println(" SORTED ARRAY: --> "+ splitTokens);
tempKey=String.valueOf(month2)+","+splitTokens[0]+","+splitTokens[1];
tempVal=splitTokens[0]+","+String.valueOf(priceMap2.get(splitTokens[0]))+","+splitTokens[1]+","+String.valueOf(priceMap2.get(splitTokens[1]));
//Finally our key value pair looks like
//7,QTM,QXM --> QTM,3.22,QXM,9.61
}
System.out.println(" NEW KEY: "+tempKey +" NEW VAL: "+ tempVal);
context.write(new Text(tempKey), new Text(tempVal));
}
}
MY WRITABLE StockDetailsTuple Class
-------------------------------------
public class StockDetailsTuple implements Writable{
private String stockName;
private int monthNum;
private int year;
private float stockPrice;
private String stockDate;
@Override
public void readFields(DataInput in) throws IOException {
this.monthNum=in.readInt();
this.year=in.readInt();
this.stockPrice=in.readFloat();
this.stockName=in.readUTF();
this.stockDate=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.monthNum);
out.writeInt(this.year);
out.writeFloat(this.stockPrice);
out.writeUTF(this.stockName);
out.writeUTF(this.stockDate);
}
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
public int getMonthNum() {
return monthNum;
}
public void setMonthNum(int monthNum) {
this.monthNum = monthNum;
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public float getStockPrice() {
return stockPrice;
}
public void setStockPrice(float stockPrice) {
this.stockPrice = stockPrice;
}
public String getStockDate() {
return stockDate;
}
public void setStockDate(String stockDate) {
this.stockDate = stockDate;
}
@Override
public String toString() {
return "StockDetailsTuple [stockName=" + stockName + ", monthNum="
+ monthNum + ", year=" + year + ", stockPrice=" + stockPrice
+ ", stockDate=" + stockDate + "]";
}
}
当我运行这段代码时,ReducerOne 出现异常。13/07/15 23:11:49 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: wrong value class: org.apache.hadoop.io.Text is not class com.assignment.StockDetailsTuple
at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1050)
at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:74)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:588)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
如果我将 ReducerOne 中的值更改为 StockDetailsTuple,则此代码运行。我在这里想念什么?我希望reducer中的键值是Text,Text。请帮忙。