1

有人可以为使用 Hbase 的 mapreduce 提供一个很好的示例链接吗?我的要求是在 hdfs 文件上运行 mapreduce 并将 reducer 输出存储到 hbase 表。映射器输入将是 hdfs 文件,输出将是 Text,IntWritable 键值对。Reducers 输出将是 Put 对象,即添加 reducer Iterable IntWritable 值并存储在 hbase 表中。

4

2 回答 2

4

这是解决您问题的代码



司机

HBaseConfiguration conf =  HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
    job.setJarByClass(yourclass.class);
    job.setMapperClass(yourMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Intwritable.class);
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    TableMapReduceUtil.initTableReducerJob(TABLE,
            yourReducer.class, job);
    job.setReducerClass(yourReducer.class);
            job.waitForCompletion(true);


Mapper&Reducer

class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
 }

class yourReducer
        extends
        TableReducer<Text, IntWritable, 
        ImmutableBytesWritable>
{
//@override rdeuce()
}

于 2012-11-27T12:16:57.993 回答
0

**使用 Phoenix Hbase 和 map reduce 使用对我来说可以正常工作的波纹管代码 **

该程序将从 Hbase 表中读取数据,并在 map-reduce 作业之后将结果插入到另一个表中。

表:-> 股票,STOCK_STATS

StockComputationJob.java

public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , DoubleWritable> {

    private Text stock = new Text(); 
    private DoubleWritable price = new DoubleWritable ();

    @Override
    protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException {
       double[] recordings = stockWritable.getRecordings();
       final String stockName = stockWritable.getStockName();
       System.out.println("Map-"+recordings);
       double maxPrice = Double.MIN_VALUE;
       for(double recording : recordings) {
           System.out.println("M-"+key+"-"+recording);
         if(maxPrice < recording) {
          maxPrice = recording;
             }
       }
       System.out.println(stockName+"--"+maxPrice);
       stock.set(stockName);
       price.set(maxPrice);
       context.write(stock,price);
    }

}

    public static void main(String[] args) throws Exception {

         final Configuration conf = new Configuration();
         HBaseConfiguration.addHbaseResources(conf);
         conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl);
         final Job job = Job.getInstance(conf, "stock-stats-job");
      // We can either specify a selectQuery or ignore it when we would like to retrieve all the columns
         final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK ";

         // StockWritable is the DBWritable class that enables us to process the Result of the above query
         PhoenixMapReduceUtil.setInput(job,StockWritable.class,"STOCK",selectQuery);  

         // Set the target Phoenix table and the columns
         PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING");

         job.setMapperClass(StockMapper.class);
         job.setReducerClass(StockReducer.class); 
         job.setOutputFormatClass(PhoenixOutputFormat.class);

         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(DoubleWritable.class);
         job.setOutputKeyClass(NullWritable.class);
         job.setOutputValueClass(StockWritable.class); 
         TableMapReduceUtil.addDependencyJars(job);
         job.waitForCompletion(true);
     }

}

StockReducer.java

    public class StockReducer extends Reducer<Text, DoubleWritable, NullWritable , StockWritable> {

     protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) throws IOException, InterruptedException {
          double maxPrice = Double.MIN_VALUE;
          System.out.println(recordings);
          for(DoubleWritable recording : recordings) {
              System.out.println("R-"+key+"-"+recording);
            if(maxPrice < recording.get()) {
             maxPrice = recording.get(); 
            }
          } 
          final StockWritable stock = new StockWritable();
          stock.setStockName(key.toString());
          stock.setMaxPrice(maxPrice);
          System.out.println(key+"--"+maxPrice);
          context.write(NullWritable.get(),stock);
        }


}

StockWritable.java

public class StockWritable  implements DBWritable,Writable {

      private String stockName;

        private int year;

        private double[] recordings;

        private double maxPrice;   

        public void readFields(DataInput input) throws IOException {

        }

        public void write(DataOutput output) throws IOException {

        }

        public void readFields(ResultSet rs) throws SQLException {
           stockName = rs.getString("STOCK_NAME");
           setYear(rs.getInt("RECORDING_YEAR"));
           final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER");
           setRecordings((double[])recordingsArray.getArray());
        }

        public void write(PreparedStatement pstmt) throws SQLException {
           pstmt.setString(1, stockName);
           pstmt.setDouble(2, maxPrice); 
        }

        public int getYear() {
            return year;
        }

        public void setYear(int year) {
            this.year = year;
        }

        public double[] getRecordings() {
            return recordings;
        }

        public void setRecordings(double[] recordings) {
            this.recordings = recordings;
        }

        public double getMaxPrice() {
            return maxPrice;
        }

        public void setMaxPrice(double maxPrice) {
            this.maxPrice = maxPrice;
        }

        public String getStockName() {
            return stockName;
        }

        public void setStockName(String stockName) {
            this.stockName = stockName;
        }


}
于 2018-12-04T11:02:19.897 回答