0

我编写了一个 Spark 应用程序,该应用程序生成 HFile,以便稍后使用该LoadIncrementalHFiles命令进行批量加载。由于源数据池非常大,输入文件被拆分为一个接一个地处理的迭代。每次迭代都会创建自己的HFile目录,因此我的 HDFS 结构如下所示:

/user/myuser/map_data/hfiles_0
         ...         /hfiles_1
         ...         /hfiles_2
         ...         /hfiles_3
                     ...

map_data目录中大约有 500 个文件,因此我正在寻找一种自动调用该LoadIncrementalHFiles函数的方法,以便在以后的迭代中也处理这些子目录。

相应的命令是这样的:

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable

我需要将其更改为迭代命令,因为此命令不适用于子目录(当我使用/user/myuser/map_data目录调用它时)!

我尝试使用 JavaProcess实例自动执行上面的命令,但这并没有做任何事情(没有输出到控制台,也没有更多的行在我的 HBase 表中)。

从我的代码中使用org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFilesJava 类也不起作用,它也没有响应!

有没有人适合我的例子?或者是否有一个参数可以hbase在父目录上运行上述命令?我在 Hortonworks Data Platform 2.5 集群中使用 HBase 1.1.2。

编辑我尝试LoadIncrementalHFiles从 Hadoop 客户端 Java 应用程序运行命令,但我遇到了与 snappy 压缩相关的异常,请参阅从 Java 客户端运行 LoadIncrementalHFiles

4

1 回答 1

0

解决方案是将hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable命令分成许多部分(每个命令部分一个),请参阅此 Java 代码片段:

TreeSet<String> subDirs = getHFileDirectories(new Path(HDFS_PATH), hadoopConf);

for(String hFileDir : subDirs) {

    try {
        String pathToReadFrom = HDFS_OUTPUT_PATH + "/" + hFileDir;
        ==> String[] execCode = {"hbase", "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles", "-Dcreate.table=no", pathToReadFrom, hbaseTableName};
        ProcessBuilder pb = new ProcessBuilder(execCode);
        pb.redirectErrorStream(true);
        final Process p = pb.start();

        // Write the output of the Process to the console
        new Thread(new Runnable() {
            public void run() {
                BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
                String line = null; 

                try {
                    while ((line = input.readLine()) != null)
                        System.out.println(line);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

    // Wait for the end of the execution
    p.waitFor();
    ...
}
于 2017-09-29T10:08:14.923 回答