0

I want to insert the out-put of my map-reduce job into a HBase table using HBase Bulk loading API LoadIncrementalHFiles.doBulkLoad(new Path(), hTable).

I am emitting the KeyValue data type from my mapper and then using the HFileOutputFormat to prepare my HFiles using its default reducer.

When I run my map-reduce job, it gets completed without any errors and it creates the outfile, however, the final step - inserting HFiles to HBase is not happening. I get the below error after my map-reduce completes:

13/09/08 03:39:51 WARN mapreduce.LoadIncrementalHFiles: Skipping non-directory hdfs://localhost:54310/user/xx.xx/output/_SUCCESS
13/09/08 03:39:51 WARN mapreduce.LoadIncrementalHFiles: Bulk load operation did not find any files to load in directory output/.  Does it contain files in subdirectories that correspond to column family names?

But I can see the output directory containing:

1. _SUCCESS
2. _logs
3. _0/2aa96255f7f5446a8ea7f82aa2bd299e file (which contains my data)

I have no clue as to why my bulkloader is not picking the files from output directory.

Below is the code of my Map-Reduce driver class:

public static void main(String[] args) throws Exception{

    String inputFile = args[0];
    String tableName = args[1];
    String outFile = args[2];
    Path inputPath = new Path(inputFile);
    Path outPath = new Path(outFile);

    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);

    //set the configurations
    conf.set("mapred.job.tracker", "localhost:54311");

    //Input data to HTable using Map Reduce
    Job job = new Job(conf, "MapReduce - Word Frequency Count");
    job.setJarByClass(MapReduce.class);

    job.setInputFormatClass(TextInputFormat.class);

    FileInputFormat.addInputPath(job, inputPath);

    fs.delete(outPath);
    FileOutputFormat.setOutputPath(job, outPath);

    job.setMapperClass(MapReduce.MyMap.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);

    HTable hTable = new HTable(conf, tableName.toUpperCase());

    // Auto configure partitioner and reducer
    HFileOutputFormat.configureIncrementalLoad(job, hTable);

    job.waitForCompletion(true);

    // Load generated HFiles into table
    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
    loader.doBulkLoad(new Path(outFile), hTable);

}

How can I figure out the wrong thing happening here which I avoiding my data insert to HBase?

4

1 回答 1

2

最后,我弄清楚了为什么我的 HFile 没有被转储到 HBase 中。以下是详细信息:

我的 create 语句 ddl 没有任何默认列名,所以我猜测 Phoenix 将默认列族创建为“_0”。我能够在我的 HDFS/hbase 目录中看到这个列族。

但是,当我使用 HBase 的 LoadIncrementalHFiles API 从我的输出目录中获取文件时,在我的情况下,它没有选择以 col-family(“0”)命名的目录。我调试了 LoadIncrementalHFiles API 代码,发现它跳过了以“ ”开头的输出路径中的所有目录(例如“_logs”)。

我再次尝试了同样的方法,但现在通过指定一些列族,一切正常。我能够使用 Phoenix SQL 查询数据。

于 2013-09-08T23:52:25.787 回答