有人可以为使用 Hbase 的 mapreduce 提供一个很好的示例链接吗?我的要求是在 hdfs 文件上运行 mapreduce 并将 reducer 输出存储到 hbase 表。映射器输入将是 hdfs 文件,输出将是 Text,IntWritable 键值对。Reducers 输出将是 Put 对象,即添加 reducer Iterable IntWritable 值并存储在 hbase 表中。
问问题
2513 次
2 回答
4
这是解决您问题的代码
司机
HBaseConfiguration conf = HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
job.setJarByClass(yourclass.class);
job.setMapperClass(yourMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Intwritable.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
TableMapReduceUtil.initTableReducerJob(TABLE,
yourReducer.class, job);
job.setReducerClass(yourReducer.class);
job.waitForCompletion(true);
Mapper&Reducer
class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
}
class yourReducer
extends
TableReducer<Text, IntWritable,
ImmutableBytesWritable>
{
//@override rdeuce()
}
于 2012-11-27T12:16:57.993 回答
0
**使用 Phoenix Hbase 和 map reduce 使用对我来说可以正常工作的波纹管代码 **
该程序将从 Hbase 表中读取数据,并在 map-reduce 作业之后将结果插入到另一个表中。
表:-> 股票,STOCK_STATS
StockComputationJob.java
public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , DoubleWritable> {
private Text stock = new Text();
private DoubleWritable price = new DoubleWritable ();
@Override
protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException {
double[] recordings = stockWritable.getRecordings();
final String stockName = stockWritable.getStockName();
System.out.println("Map-"+recordings);
double maxPrice = Double.MIN_VALUE;
for(double recording : recordings) {
System.out.println("M-"+key+"-"+recording);
if(maxPrice < recording) {
maxPrice = recording;
}
}
System.out.println(stockName+"--"+maxPrice);
stock.set(stockName);
price.set(maxPrice);
context.write(stock,price);
}
}
public static void main(String[] args) throws Exception {
final Configuration conf = new Configuration();
HBaseConfiguration.addHbaseResources(conf);
conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl);
final Job job = Job.getInstance(conf, "stock-stats-job");
// We can either specify a selectQuery or ignore it when we would like to retrieve all the columns
final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK ";
// StockWritable is the DBWritable class that enables us to process the Result of the above query
PhoenixMapReduceUtil.setInput(job,StockWritable.class,"STOCK",selectQuery);
// Set the target Phoenix table and the columns
PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING");
job.setMapperClass(StockMapper.class);
job.setReducerClass(StockReducer.class);
job.setOutputFormatClass(PhoenixOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(StockWritable.class);
TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true);
}
}
StockReducer.java
public class StockReducer extends Reducer<Text, DoubleWritable, NullWritable , StockWritable> {
protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) throws IOException, InterruptedException {
double maxPrice = Double.MIN_VALUE;
System.out.println(recordings);
for(DoubleWritable recording : recordings) {
System.out.println("R-"+key+"-"+recording);
if(maxPrice < recording.get()) {
maxPrice = recording.get();
}
}
final StockWritable stock = new StockWritable();
stock.setStockName(key.toString());
stock.setMaxPrice(maxPrice);
System.out.println(key+"--"+maxPrice);
context.write(NullWritable.get(),stock);
}
}
StockWritable.java
public class StockWritable implements DBWritable,Writable {
private String stockName;
private int year;
private double[] recordings;
private double maxPrice;
public void readFields(DataInput input) throws IOException {
}
public void write(DataOutput output) throws IOException {
}
public void readFields(ResultSet rs) throws SQLException {
stockName = rs.getString("STOCK_NAME");
setYear(rs.getInt("RECORDING_YEAR"));
final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER");
setRecordings((double[])recordingsArray.getArray());
}
public void write(PreparedStatement pstmt) throws SQLException {
pstmt.setString(1, stockName);
pstmt.setDouble(2, maxPrice);
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public double[] getRecordings() {
return recordings;
}
public void setRecordings(double[] recordings) {
this.recordings = recordings;
}
public double getMaxPrice() {
return maxPrice;
}
public void setMaxPrice(double maxPrice) {
this.maxPrice = maxPrice;
}
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
}
于 2018-12-04T11:02:19.897 回答