0

我对大数据和 NOSQL 领域很陌生,我正在尝试一个示例程序

我正在尝试从我的 mongo 数据库中获取详细信息。以下是我的数据库模式-

  { "_id" : ObjectId("51d11c95e82449edcf7640bc"), "Called_Number" : NumberLong("7259400112"), "Calling_Number" : NumberLong("9008496311"), "Date" : "22-Apr-13", "Time" : "10:21:43", "Duration" : "4:36" }

现在我正在尝试从数据库中获取值并运行 map reduce 作业,以便我可以找到如下详细信息

{“来电号码”:7259400112,被叫号码:“9008496311”频率:“3”}

以下是我正在尝试的

package callcircle;

import java.io.*;
import java.util.*;

import org.apache.commons.logging.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.bson.*;

import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.hadoop.*;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.*;

public class call {

    private static final Log log = LogFactory.getLog(call.class);

    public static class TokenizerMapper extends
            Mapper<Object, Object, Text, IntWritable> {


        private final static IntWritable one = new IntWritable(1);
        private final Text word = new Text();


        public void map(Object calling_number, Object called_number,
                Context context) throws IOException, InterruptedException {
            System.out.println("entering method");


        //  calling_number = (Object) calling_number).get("Calling_Number");
            called_number = ((BSONWritable) called_number).get("Called_Number");

            String CallNumer01 = called_number.toString();

            String[] recips = CallNumer01.split(",");



            for (int i = 0; i < recips.length; i++) {
                String recip = recips[i].trim();
                if (recip.length() > 0) {


                    // context.write(new CallPair(calling_number, recip), new IntWritable(1));
                    // word.set(CallNumer01); context.write( word, one );

                    //System.out.println("After mapping");

                }
            }
        }
    }

    public class CallReducer extends
        Reducer<CallPair, IntWritable, BSONWritable, IntWritable> {

        public void reduce(final CallPair pKey,
                final Iterable<IntWritable> pValues, final Context pContext)
                throws IOException, InterruptedException {
            int sum = 0;
            for (final IntWritable value : pValues) {
                sum += value.get();
            }
            @SuppressWarnings("static-access")
            BSONObject outDoc = new BasicDBObjectBuilder().start()
                    .add("f", pKey.calling_number).add("t", pKey.called_number)
                    .get();
            BSONWritable pkeyOut = new BSONWritable(outDoc);
            pContext.write(pkeyOut, new IntWritable(sum));
        }

    }



    public static void main(String[] args) throws Exception {
        System.out.println("In Main");
        final Configuration conf = new Configuration();
        System.out.println("Conf1: " + conf);
        MongoConfigUtil.setInputURI(conf, "mongodb://localhost/CDR.in1");
        MongoConfigUtil.setOutputURI(conf, "mongodb://localhost/CDR.out");
        System.out.println("Conf: " + conf);

        final Job job = new Job(conf, "CDR");

        job.setJarByClass(call.class);
        System.out.println("Conf2: " + conf);

        job.setMapperClass(TokenizerMapper.class);

        job.setCombinerClass(CallReducer.class);
        job.setReducerClass(CallReducer.class);
        System.out.println("Conf3: " + conf);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.out.println("Conf3: " + conf);
        job.setInputFormatClass(MongoInputFormat.class);
        job.setOutputFormatClass(MongoOutputFormat.class);
        System.out.println("Conf4: " + conf);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        System.out.println("Conf6: " + conf);
    }

}

但我收到以下错误

In Main
Conf1: Configuration: core-default.xml, core-site.xml
Conf: Configuration: core-default.xml, core-site.xml
13/07/01 19:04:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
Conf2: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf3: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf3: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
Conf4: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml
13/07/01 19:04:27 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/07/01 19:04:27 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
13/07/01 19:04:28 INFO util.MongoSplitter:  Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
13/07/01 19:04:28 INFO util.MongoSplitter: Creation of Input Splits is enabled.
13/07/01 19:04:28 INFO util.MongoSplitter: Using Unsharded Split mode (Calculating multiple splits though)
13/07/01 19:04:28 INFO util.MongoSplitter: Calculating unsharded input splits on namespace 'CDR.in1' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
13/07/01 19:04:28 WARN util.MongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable.
13/07/01 19:04:28 INFO mapred.JobClient: Running job: job_local_0001
13/07/01 19:04:28 INFO util.MongoSplitter:  Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
13/07/01 19:04:28 INFO util.MongoSplitter: Creation of Input Splits is enabled.
13/07/01 19:04:28 INFO util.MongoSplitter: Using Unsharded Split mode (Calculating multiple splits though)
13/07/01 19:04:28 INFO util.MongoSplitter: Calculating unsharded input splits on namespace 'CDR.in1' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
13/07/01 19:04:28 WARN util.MongoSplitter: WARNING: No Input Splits were calculated by the split code. Proceeding with a *single* split. Data may be too small, try lowering 'mongo.input.split_size' if this is undesirable.
should setup context
13/07/01 19:04:28 INFO input.MongoInputSplit: Deserialized MongoInputSplit ... { length = 9223372036854775807, locations = [localhost], keyField = _id, query = { "$query" : { }}, fields = { }, sort = { }, limit = 0, skip = 0, noTimeout = false}
13/07/01 19:04:28 INFO mapred.MapTask: io.sort.mb = 100
13/07/01 19:04:28 INFO mapred.MapTask: data buffer = 79691776/99614720
13/07/01 19:04:28 INFO mapred.MapTask: record buffer = 262144/327680
entering method
13/07/01 19:04:28 WARN mapred.LocalJobRunner: job_local_0001
java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.hadoop.io.BSONWritable
    at callcircle.call$TokenizerMapper.map(call.java:36)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
13/07/01 19:04:29 INFO mapred.JobClient:  map 0% reduce 0%
13/07/01 19:04:29 INFO mapred.JobClient: Job complete: job_local_0001
13/07/01 19:04:29 INFO mapred.JobClient: Counters: 0

请问有人可以指导我哪里错了吗?

谢谢

4

1 回答 1

1

如果 mapper 和 reducer 不使用相同的输出类型,则必须明确指定 mapper 键/值类型 - 所以您可能还需要添加:

setMapOutputKeyClass(Text.class)
setMapOutputValueClass(IntWritable.class)
于 2013-07-15T21:52:18.620 回答