1

我想编写从 HDFS 读取输入,使用 MapReduce 处理它并将输出写入 MongoDb 的 Java 程序。

这是场景:

  1. 我有一个 Hadoop 集群,它有 3 个数据节点。
  2. 一个 java 程序从 HDFS 读取输入,使用 MapReduce 处理它。
  3. 最后,将结果写入 MongoDb。

实际上,从 HDFS 读取并使用 MapReduce 处理它很简单。但是我被困在将结果写入 MongoDb 中。是否有任何 Java API 支持将结果写入 MongoDB?另一个问题是,由于它是一个 Hadoop Cluster,所以我们不知道哪个 datanode 将运行 Reducer 任务并生成结果,是否可以将结果写入安装在特定服务器上的 MongoDb 中?

如果我想将结果写入 HDFS,代码将是这样的:

@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException 
{
    long sum = 0;
    for (LongWritable value : values) 
    {
        sum += value.get();
    }

    context.write(new Text(key), new LongWritable(sum));
}

现在我想将结果写入 MongoDb 而不是 HDFS,我该怎么做?

4

3 回答 3

2

你想要«MongoDB 的 Hadoop 连接器»。的例子

很容易在 Reducer 中添加代码,作为副作用,将数据插入数据库。避免这种诱惑。使用连接器而不是仅仅插入数据作为 reducer 类的副作用的一个原因是推测执行:Hadoop 有时可以并行运行两个完全相同的 reduce 任务,这可能导致无关的插入和重复数据。

于 2013-12-30T07:03:12.040 回答
0

是的。你像往常一样写信给 mongo。您的 mongo db 设置为在分片上运行这一事实是一个对您隐藏的细节。

于 2014-10-30T01:17:34.867 回答
0

我花了一上午的时间来实现同样的场景。这是我的解决方案:

创建三个类:

  • Experiment.java:用于作业配置和提交
  • MyMap.java:映射器类
  • MyReduce.java:reducer

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.mongodb.hadoop.io.BSONWritable;
    import com.mongodb.hadoop.mapred.MongoOutputFormat;
    
    public class Experiment extends Configured implements Tool{
    
         public int run(final String[] args) throws Exception {
            final Configuration conf = getConf();
            conf.set("mongo.output.uri", args[1]);
    
            final JobConf job = new JobConf(conf);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            job.setJarByClass(Experiment.class);
    
            job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setOutputFormat(MongoOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BSONWritable.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            JobClient.runJob(job);
    
            return 0;
        }
    
        public static void main(final String[] args) throws Exception{
    
            int res = ToolRunner.run(new TweetPerUserToMongo(), args);
            System.exit(res);
        }
    }
    

当您从集群运行 Experiment 类时,您将输入两个参数。第一个参数是来自 HDFS 位置的输入源,第二个参数是指将保留结果的 mongodb URI。这是一个示例调用。假设您的 Experiment.java 位于包名 org.example 下。

sudo -u hdfs hadoop jar ~/jar/myexample.jar org.example.Experiment myfilesinhdfs/* mongodb://192.168.0.1:27017/mydbName.myCollectionName

这可能不是最好的方法,但它对我有用。

于 2015-01-24T20:37:19.297 回答