2

我确实有一个包含超过十亿个对象的表,如下所示:

{
    "_id" : ObjectId("5893ae4f7a5449caebba5848"),
    "id" : NumberLong(1234567890),
    "inserted_at" : ISODate("2017-02-02T22:10:23.812Z")
}

它保存在 mongodb 3.2.11 上。

我每周插入近 5000 万条新记录,并且需要将新一周的记录与前一周的记录进行比较。

因此,我建立了一个这样的查询:

db.getCollection('table').aggregate(
   [
        {"$group" : {
            "_id": {
                "year": { "$year": "$inserted_at"},
                "week": { "$week": "$inserted_at"}},
            "Content_IDs": { "$push": "$id"}}},
        { "$sort": {'_id.year': -1, '_id.week': -1}},
        { "$limit": 2},



        { "$group": {
             "_id": null,
             "Last": { $first: "$Content_IDs" },
             "Previous": { $last: "$Content_IDs"}
        }},

        { $project: {
            "Outgoing": { $setDifference: [ "$Previous", "$Last" ] },
            "Incoming": { $setDifference: [ "$Last", "$Previous" ] }}},
   ],
   {allowDiskUse:true}
)

但是由于数据的大小,mongodb 无法计算结果。

错误如下:

断言:命令失败:{“ok”:0,“errmsg”:“BufBuilder 试图增长()到 134217728 字节,超过 64MB 限制。”,“代码”:13548 }

我试图将所有记录提取到 Python env 中,以便在那里计算结果,但是当我在 pymongo.aggregate 上运行以下管道时遇到了同样的错误:

[
            {"$group" : {
                "_id": {
                    "year": { "$year": "$inserted_at"},
                    "week": { "$week": "$inserted_at"}},
                "Content_IDs": { "$push": "$id"}}},
            { "$sort": SON([('_id.year', -1), ('_id.week', -1)])},
            { "$limit": 2}
        ]

它适用于较小规模的数据。如何使此查询更具可扩展性?有什么建议么?

非常感谢!

4

1 回答 1

0

我面临着类似的问题,我开始使用 Spark(和 Scala)来操作我的数据。内存限制不大,可以在集群中使用,比nodejs快。

于 2017-02-16T02:43:30.470 回答