6

Is there an elegant, easy and fast way to move data out of Hive into MongoDB?

4

3 回答 3

2

您可以使用Hadoop-MongoDB连接器进行导出。只需在作业的 main 方法中运行 Hive 查询。然后 Mapper 将使用此输出将数据插入到MongoDB.

例子:

在这里,我使用一个简单的 Hive 查询将分号分隔的文本文件 ( id;firstname;lastname ) 插入到 MongoDB 集合中:

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.MongoConfigUtil;

public class HiveToMongo extends Configured implements Tool {

    private static class HiveToMongoMapper extends
            Mapper<LongWritable, Text, IntWritable, BSONWritable> {

        //See: https://issues.apache.org/jira/browse/HIVE-634
        private static final String HIVE_EXPORT_DELIMETER = '\001' + "";
        private IntWritable k = new IntWritable();
        private BSONWritable v = null;

        @Override
        public void map(LongWritable key, Text value, Context context) 
          throws IOException, InterruptedException {

            String [] split = value.toString().split(HIVE_EXPORT_DELIMETER);

            k.set(Integer.parseInt(split[0]));
            v = new BSONWritable();
            v.put("firstname", split[1]);
            v.put("lastname", split[2]);
            context.write(k, v);

        }
    }

    public static void main(String[] args) throws Exception {
        try {
            Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver");
        }
        catch (ClassNotFoundException e) {
            System.out.println("Unable to load Hive Driver");
            System.exit(1);
        }

        try {
            Connection con = DriverManager.getConnection(
                "jdbc:hive://localhost:10000/default");

            Statement stmt = con.createStatement();    
            String sql = "INSERT OVERWRITE DIRECTORY " +
                    "'hdfs://localhost:8020/user/hive/tmp' select * from users";
            stmt.executeQuery(sql);

        }
        catch (SQLException e) {
            System.exit(1);
        }

        int res = ToolRunner.run(new Configuration(), new HiveToMongo(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = getConf();
        Path inputPath = new Path("/user/hive/tmp");
        String mongoDbPath = "mongodb://127.0.0.1:6900/mongo_users.mycoll";
        MongoConfigUtil.setOutputURI(conf, mongoDbPath);

        /*
        Add dependencies to distributed cache via 
        DistributedCache.addFileToClassPath(...) :
        - mongo-hadoop-core-x.x.x.jar
        - mongo-java-driver-x.x.x.jar
        - hive-jdbc-x.x.x.jar
        HadoopUtils is an own utility class
        */
        HadoopUtils.addDependenciesToDistributedCache("/libs/mongodb", conf);
        HadoopUtils.addDependenciesToDistributedCache("/libs/hive", conf);

        Job job = new Job(conf, "HiveToMongo");

        FileInputFormat.setInputPaths(job, inputPath);
        job.setJarByClass(HiveToMongo.class);
        job.setMapperClass(HiveToMongoMapper.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(MongoOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);

        job.submit();
        System.out.println("Job submitted.");
        return 0;
    }
}

一个缺点是需要一个“暂存区”(/user/hive/tmp)来存储中间 Hive 输出。此外,据我所知,Mongo-Hadoop 连接器不支持 upserts。

我不太确定,但您也可以尝试在Hive不运行 hiveserver公开 Thrift 服务的情况下从中获取数据,这样您就可以节省一些开销。查看org.apache.hadoop.hive.cli.CliDriver#processLine(String line, boolean allowInterupting)实际执行查询的 Hive 方法的源代码。然后你可以把这样的东西拼凑在一起:

...
LogUtils.initHiveLog4j();
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
ss.in = System.in;
ss.out = new PrintStream(System.out, true, "UTF-8");
ss.err = new PrintStream(System.err, true, "UTF-8");
SessionState.start(ss);

Driver qp = new Driver();
processLocalCmd("SELECT * from users", qp, ss); //taken from CliDriver
...

旁注:

您还可以检查一个hive-mongo连接器实现。如果您MongoDB.

于 2012-09-16T00:18:31.300 回答
1

你看过Sqoop吗?它应该使在 Hadoop 和 SQL/NoSQL 数据库之间移动数据变得非常简单。本文还提供了一个将其与 Hive 一起使用的示例。

于 2012-09-12T17:29:47.547 回答
1

看一下hadoop-MongoDB连接器项目:

http://api.mongodb.org/hadoop/MongoDB%2BHadoop+Connector.html

“这种连接的形式既允许将 MongoDB 数据读入 Hadoop(用于 MapReduce 作业以及 Hadoop 生态系统的其他组件),也允许将 Hadoop 作业的结果写入 MongoDB。”

不确定它是否适用于您的用例,但值得一看。

于 2012-09-14T16:01:36.370 回答