3

我需要实现一个从 HBase 表和 HDFS 文件访问数据的 MR 作业。例如,mapper 从 HBase 表和 HDFS 文件中读取数据,这些数据共享相同的主键但具有不同的模式。然后 reducer 将所有列(来自 HBase 表和 HDFS 文件)连接在一起。

我尝试在线查看,但找不到使用这种混合数据源运行 MR 作业的方法。MultipleInputs 似乎只适用于多个 HDFS 数据源。如果您有任何想法,请告诉我。示例代码会很棒。

4

3 回答 3

8

经过几天的调查(并从 HBase 用户邮件列表中获得帮助),我终于想出了如何去做。这是源代码:

public class MixMR {

public static class Map extends Mapper<Object, Text, Text, Text> {

    public void map(Object key, Text value, Context context) throws IOException,   InterruptedException {
        String s = value.toString();
        String[] sa = s.split(",");
        if (sa.length == 2) {
            context.write(new Text(sa[0]), new Text(sa[1]));
        }

    }

}

public static class TableMap extends TableMapper<Text, Text>  {
    public static final byte[] CF = "cf".getBytes();
    public static final byte[] ATTR1 = "c1".getBytes();

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {

        String key = Bytes.toString(row.get());
        String val = new String(value.getValue(CF, ATTR1));

        context.write(new Text(key), new Text(val));
    }
}


public static class Reduce extends Reducer  <Object, Text, Object, Text> {
    public void reduce(Object key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        String ks = key.toString();
        for (Text val : values){
            context.write(new Text(ks), val);
        }

    }
}

public static void main(String[] args) throws Exception {
Path inputPath1 = new Path(args[0]);
    Path inputPath2 = new Path(args[1]);
    Path outputPath = new Path(args[2]);

    String tableName = "test";

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "ExampleRead");
    job.setJarByClass(MixMR.class);     // class that contains mapper

    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);  // don't set to true for MR jobs
    scan.addFamily(Bytes.toBytes("cf"));

    TableMapReduceUtil.initTableMapperJob(
            tableName,        // input HBase table name
              scan,             // Scan instance to control CF and attribute selection
              TableMap.class,   // mapper
              Text.class,             // mapper output key
              Text.class,             // mapper output value
              job);


    job.setReducerClass(Reduce.class);    // reducer class
    job.setOutputFormatClass(TextOutputFormat.class);   


    // inputPath1 here has no effect for HBase table
    MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, Map.class);
    MultipleInputs.addInputPath(job, inputPath2,  TableInputFormat.class, TableMap.class);

    FileOutputFormat.setOutputPath(job, outputPath); 

    job.waitForCompletion(true);
}

}

于 2013-07-12T15:56:02.563 回答
0

没有支持此功能的 OOTB 功能。一种可能的解决方法是先扫描 HBase 表并将结果写入 HDFS 文件,然后使用 MultipleInputs 执行 reduce-side join。但这会产生一些额外的 I/O 开销。

于 2013-07-04T12:12:13.697 回答
0

pig 脚本或 hive 查询可以轻松做到这一点。

示例猪脚本

tbl = LOAD 'hbase://SampleTable'
       USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
       'info:* ...', '-loadKey true -limit 5')
       AS (id:bytearray, info_map:map[],...);

fle = LOAD '/somefile' USING PigStorage(',') AS (id:bytearray,...);

Joined = JOIN A tbl by id,fle by id;
STORE Joined to ...
于 2013-07-04T15:00:25.040 回答