0

我已经在 mongo 数据库中记录了来自信息系统的更改。每次设置或更改一组值时,都会在 mongo 数据库中保存一条记录。

变更集合采用以下形式:

{ "user_id": 1, "timestamp": { "date" : "2010-09-22 09:28:02", "timezone_type" : 3, "timezone" : "Europe/Paris" } }, "changes: { "fieldA": "valueA", "fieldB": "valueB", "fieldC": "valueC" } }
{ "user_id": 1, "timestamp": { "date" : "2010-09-24 19:01:52", "timezone_type" : 3, "timezone" : "Europe/Paris" } }, "changes: { "fieldA": "new_valueA", "fieldB": null, "fieldD": "valueD" } }
{ "user_id": 1, "timestamp": { "date" : "2010-10-01 11:11:02", "timezone_type" : 3, "timezone" : "Europe/Paris" } }, "changes: { "fieldD": "new_valueD" } }

当然,每个用户有数千条具有不同属性的记录,代表数百万条记录。我想要做的是在给定时间查看用户状态。例如,2010 年 9 月 30 日的 user_id 1 将是

fieldA: new_valueA
fieldC: valueC
fieldD: valueD

这意味着我需要将给定用户在给定日期之前的所有更改拼合到一条记录中。我可以直接在 mongo 中这样做吗?

编辑:我使用的是 2.0 版本的 mongodb,因此无法从聚合框架中受益。

编辑:听起来我已经找到了我的问题的答案。

var mapTimeAndChangesByUserId = function() { 
    var key = this.user_id;
    var value = { timestamp: this.timestamp.date, changes: this.changes };
    emit(key, value);
}

var reduceMergeChanges = function(user_id, changeset) {
    var mergeFunction = function(a, b) { for (var attr in b) a[attr] = b[attr]; };
    var result = {};

    changeset.forEach(function(e) { mergeFunction(result, e.changes); }); 

    return { timestamp: changeset.pop().timestamp, changes: result };
}

reduce 函数按它们来的顺序合并更改并返回结果。

db.user_change.mapReduce(
    mapTimeAndChangesByUserId, 
    reduceMergeChanges,
    { 
        out:   { inline: 1 },
        query: { user_id: 1, "timestamp.date": { $lt: "2010-09-30" } },
        sort:  { "timestamp.date": 1 }
    });
'results' : [
    "_id": 1,
    "value": {
        "timestamp": "2010-09-24 19:01:52",
        "changes": {
            "fieldA": "new_valueA",
            "fieldB": null,
            "fieldC": "valueC",
            "fieldD": "valueD"
        }
    }
]

这对我来说很好。

4

1 回答 1

1

你可以写一个 MR 来做到这一点。

由于这些字段很像标签,您可以在此处修改一个很好的计算标签的食谱示例:http: //cookbook.mongodb.org/patterns/count_tags/当然,您希望应用最新值而不是计数(假设因为这是您的问题不清楚)该领域。

所以让我们得到我们的地图功能:

map = function() {
    if (!this.changes) {
        // If there were not changes for some reason lets bail this record
        return;
    }

    // We iterate the changes
    for (index in this.changes) {
        emit(index /* We emit the field name */, this.changes[index] /* We emit the field value */);
    }
}

现在对于我们的减少:

reduce = function(values){
    // This part is dependant upon your input query. If you add a sort of 
    // date (ts) DESC then you will prolly want the first index (0) not the last as
    // gathered here by values.length
    return values[values.length];
}

这将针对类型的每个字段更改输出一个文档:

{
    _id: your_field_ie_fieldA,
    value: whoop
}

然后,您可以迭代(最有可能)行输出的结尾,并且,bam,您有您的更改。

这当然是一种实现方式,并非旨在完全按照您的应用程序运行,但这一切都取决于您处理的数据的大小;它可以非常接近地运行。

我不确定groupand是否distinct可以在此运行,但看起来可能:http ://docs.mongodb.org/manual/reference/method/db.collection.group/#db-collection-group但是我应该注意group 基本上是一个 MR 包装器,但您可以执行以下操作(就像上面的 MR 一样未经测试):

db.col.group( {
                   key: { 'changes.fieldA': 1, // the rest of the fields },
                   cond: { 'timestamp.date': { $gt: new Date( '01/01/2012' ) } },
                   reduce: function ( curr, result ) { },
                   initial: { }
                } )

但它确实需要您定义键,而不是仅仅以可编程的方式迭代它们(也许是更好的方法)。

于 2012-12-27T16:43:01.363 回答