0

I am trying to figure out an elegant way to do the following in mongo / python: I have two collections: one containing a list of people and attributes and one containing a subset of people that is a 'population subset'. I want to run a map reduce job to calculate some aggregate stats on the large list but only using names of people that appear in the population sample. Here is an example set of records:

master_list: [{ Name: Jim }, { Age: 24}
              { Name: Bill}, { Age: 38}
              { Name: Mary}, { Age: 55}]

subset : [{ Name: Jim}
          { Name: Mary}]

The idea is to calculate an average of age but only using two of the three records in the master_list, as listed in subset. I am aware that map_reduce in mongo supports a query parameter, but not clear what the best way to deal with the above is given the no joining. One option is for me to preprocess master_list and create an attribute 'include' to flag which records to use, and then operate on that in map_reduce filter. Seems kludgy though and creates a permanent flag in my database which is annoying for various reasons.

UPDATE

After reading suggestions to embed list in query I was able to get what I needed with the below

map_reduce(mapper, reducer, out = {'merge': 'Stats'}, 
           finalize = finalizer, scope = {'atts': f},
           query = {'Name' : { '$in' : pop }})

Where pop is a python list of names. Thanks!

4

1 回答 1

4

在 MongoDB 中有两种方法可以解决这个问题。

  1. 如果您的子集相当小,您可以只对子集进行查询以查找所有成员并将该查询的结果用作对 map-reduce 调用的初始查询。

  2. 但是,如果您有非常大的子集,这可能是不可能的。然后,您可以做的是使用两个带有“reduce”输出选项的 map-reduce 调用来模拟连接,以减少到同一个目标集合中。这将创建一个中间集合,其中的文档如下所示:

    {Name: Jim, Age: 24, inSubset: true}
    {Name: Bill, Age: 38, inSubset: false}
    {Name: Mary, Age: 55, inSubset: true}
    

    最后,您可以在此中间集合上执行第三个 map reduce 以平均所有具有inSubset: true.

这是 Python 中 2. 选项(三个 map-reduce)的代码,使用 pymongo 驱动程序:

from pymongo import Connection
from bson import ObjectId, Code

con = Connection(port=30000)  # add host/port here if different from default
db = con['test']    # or the database name you are using

# insert documents
db.master.insert({'_id': ObjectId(), 'Name': 'Jim', 'Age': 24})
db.master.insert({'_id': ObjectId(), 'Name': 'Bill', 'Age': 38})
db.master.insert({'_id': ObjectId(), 'Name': 'Mary', 'Age': 55})

db.subset.insert({'_id': ObjectId(), 'Name': 'Jim'})
db.subset.insert({'_id': ObjectId(), 'Name': 'Mary'})

# map function for master collection
mapf_master = Code(""" function () {
    emit(this.Name, {'age': this.Age, 'inSubset': false});
} """)

# map function for subset collection
mapf_subset = Code(""" function() {
    emit(this.Name, {'age': 0, 'inSubset': true});
} """)

# reduce function for both master and subset
reducef = Code(""" function(key, values) {
    var result = {'age': 0, 'inSubset': false};

    values.forEach( function(value) {
        result.age += value.age;
        result.inSubset = result.inSubset || value.inSubset;
    });

    return result;
} """)

# call map-reduce on master and subset (simulates a join)
db.master.map_reduce(mapf_master, reducef, out={'reduce': 'join'})
db.subset.map_reduce(mapf_subset, reducef, out={'reduce': 'join'})


# final map function for third map-reduce call
mapf_final = Code(""" function() {
    if (this.value.inSubset) {
        emit('total', {'age': this.value.age, 'count': 1});
    }
} """)

# final reduce function for third map-reduce call
reducef_final = Code(""" function(key, values) {
    var result = {'age': 0, 'count': 0};

    values.forEach( function(value) {
        result.age += value.age;
        result.count += value.count;
    });

    return result;
} """)


# final finalize function, calculates the average
finalizef_final = Code(""" function(key, value) {
    if (value.count > 0) {
        value.averageAge = value.age / value.count;
    }
    return value;
} """)


# call final map-reduce 
db.join.map_reduce(mapf_final, reducef_final, finalize=finalizef_final, out={'merge': 'result'})

结果集合如下所示(从 mongo shell 查询):

> db.result.find()
{ "_id" : "total", "value" : { "age" : 79, "count" : 2, "averageAge" : 39.5 } }

最终平均值存储在 value.averageAge 字段中。

于 2012-09-14T03:59:50.323 回答