12

样本文件:

{ time: ISODate("2013-10-10T20:55:36Z"), value: 1 }
{ time: ISODate("2013-10-10T22:43:16Z"), value: 2 }
{ time: ISODate("2013-10-11T19:12:66Z"), value: 3 }
{ time: ISODate("2013-10-11T10:15:38Z"), value: 4 }
{ time: ISODate("2013-10-12T04:15:38Z"), value: 5 }

很容易获得按日期分组的汇总结果。但我想要的是查询返回聚合运行总数的结果,例如:

{ time: "2013-10-10" total: 3, runningTotal: 3  }
{ time: "2013-10-11" total: 7, runningTotal: 10 }
{ time: "2013-10-12" total: 5, runningTotal: 15 }

MongoDB 聚合可以做到这一点吗?

4

4 回答 4

12

这可以满足您的需要。我已经对数据中的时间进行了标准化,因此它们组合在一起(你可以做这样的事情。这个想法是将'和'$group推入单独的数组中。然后 是数组,并且您已经为每个文档制作了数组的副本。然后,您可以从包含不同时间所有数据的数组中计算(或类似滚动平均值)。生成的“索引”是对应的数组索引。before ing很重要,因为这可以确保数组的顺序正确。timetotal$unwindtimetotalstimerunningTotal$unwindtotaltime$sort$unwind

db.temp.aggregate(
    [
        {
            '$group': {
                '_id': '$time',
                'total': { '$sum': '$value' }
            }
        },
        {
            '$sort': {
                 '_id': 1
            }
        },
        {
            '$group': {
                '_id': 0,
                'time': { '$push': '$_id' },
                'totals': { '$push': '$total' }
            }
        },
        {
            '$unwind': {
                'path' : '$time',
                'includeArrayIndex' : 'index'
            }
        },
        {
            '$project': {
                '_id': 0,
                'time': { '$dateToString': { 'format': '%Y-%m-%d', 'date': '$time' }  },
                'total': { '$arrayElemAt': [ '$totals', '$index' ] },
                'runningTotal': { '$sum': { '$slice': [ '$totals', { '$add': [ '$index', 1 ] } ] } },
            }
        },
    ]
);

我在一个包含约 80 000 个文档的集合上使用了类似的东西,总计 63 个结果。我不确定它在更大的集合上的效果如何,但我发现一旦数据减少到可管理的大小,对聚合数据执行转换(投影、数组操作)似乎并没有很大的性能成本。

于 2018-01-13T21:44:16.250 回答
3

这是另一种方法

管道

db.col.aggregate([
    {$group : {
        _id : { time :{ $dateToString: {format: "%Y-%m-%d", date: "$time", timezone: "-05:00"}}},
        value : {$sum : "$value"}
    }},
    {$addFields : {_id : "$_id.time"}},
    {$sort : {_id : 1}},
    {$group : {_id : null, data : {$push : "$$ROOT"}}},
    {$addFields : {data : {
        $reduce : {
            input : "$data",
            initialValue : {total : 0, d : []},
            in : {
                total : {$sum : ["$$this.value", "$$value.total"]},                
                d : {$concatArrays : [
                        "$$value.d",
                        [{
                            _id : "$$this._id",
                            value : "$$this.value",
                            runningTotal : {$sum : ["$$value.total", "$$this.value"]}
                        }]
                ]}
            }
        }
    }}},
    {$unwind : "$data.d"},
    {$replaceRoot : {newRoot : "$data.d"}}
]).pretty()

收藏

> db.col.find()
{ "_id" : ObjectId("4f442120eb03305789000000"), "time" : ISODate("2013-10-10T20:55:36Z"), "value" : 1 }
{ "_id" : ObjectId("4f442120eb03305789000001"), "time" : ISODate("2013-10-11T04:43:16Z"), "value" : 2 }
{ "_id" : ObjectId("4f442120eb03305789000002"), "time" : ISODate("2013-10-12T03:13:06Z"), "value" : 3 }
{ "_id" : ObjectId("4f442120eb03305789000003"), "time" : ISODate("2013-10-11T10:15:38Z"), "value" : 4 }
{ "_id" : ObjectId("4f442120eb03305789000004"), "time" : ISODate("2013-10-13T02:15:38Z"), "value" : 5 }

结果

{ "_id" : "2013-10-10", "value" : 3, "runningTotal" : 3 }
{ "_id" : "2013-10-11", "value" : 7, "runningTotal" : 10 }
{ "_id" : "2013-10-12", "value" : 5, "runningTotal" : 15 }
> 
于 2018-02-24T13:39:24.263 回答
2

这是一个解决方案,无需将以前的文档推送到新数组中然后对其进行处理。(如果数组变得太大,那么您可能会超过最大 BSON 文档大小限制,即 16MB。)

计算运行总数很简单:

db.collection1.aggregate(
[
  {
    $lookup: {
      from: 'collection1',
      let: { date_to: '$time' },
      pipeline: [
        {
          $match: {
            $expr: {
              $lt: [ '$time', '$$date_to' ]
            }
          }
        },
        {
          $group: {
            _id: null,
            summary: {
              $sum: '$value'
            }
          }
        }
      ],
      as: 'sum_prev_days'
    }
  },
  {
    $addFields: {
      sum_prev_days: {
        $arrayElemAt: [ '$sum_prev_days', 0 ]
      }
    }
  },
  {
    $addFields: {
      running_total: {
        $sum: [ '$value', '$sum_prev_days.summary' ]
      }
    }
  },
  {
    $project: { sum_prev_days: 0 }
  }
]
)

我们做了什么:在查找中,我们选择了所有具有较小日期时间的文档并立即计算总和(使用 $group 作为查找管道的第二步)。$lookup 将值放入数组的第一个元素中。我们拉取第一个数组元素,然后计算总和:当前值 + 先前值的总和。

如果您想将事务分组为天,然后计算运行总计,那么我们需要将 $group 插入到开头,并将其插入到 $lookup 的管道中。

db.collection1.aggregate(
[
  {
    $group: {
      _id: {
        $substrBytes: ['$time', 0, 10]
      },
      value: {
        $sum: '$value'
      }
    }
  },
  {
    $lookup: {
      from: 'collection1',
      let: { date_to: '$_id' },
      pipeline: [
        {
          $group: {
            _id: {
              $substrBytes: ['$time', 0, 10]
            },
            value: {
              $sum: '$value'
            }
          }
        },
        {
          $match: {
            $expr: {
              $lt: [ '$_id', '$$date_to' ]
            }
          }
        },
        {
          $group: {
            _id: null,
            summary: {
              $sum: '$value'
            }
          }
        }
      ],
      as: 'sum_prev_days'
    }
  },
  {
    $addFields: {
      sum_prev_days: {
        $arrayElemAt: [ '$sum_prev_days', 0 ]
      }
    }
  },
  {
    $addFields: {
      running_total: {
        $sum: [ '$value', '$sum_prev_days.summary' ]
      }
    }
  },
  {
    $project: { sum_prev_days: 0 }
  }
]
)

结果是:

{ "_id" : "2013-10-10", "value" : 3, "running_total" : 3 }
{ "_id" : "2013-10-11", "value" : 7, "running_total" : 10 }
{ "_id" : "2013-10-12", "value" : 5, "running_total" : 15 }
于 2019-08-07T12:16:01.330 回答
0

从 开始,它是新聚合运算符Mongo 5的完美用例:$setWindowFields

// { time: ISODate("2013-10-10T20:55:36Z"), value: 1 }
// { time: ISODate("2013-10-10T22:43:16Z"), value: 2 }
// { time: ISODate("2013-10-11T12:12:66Z"), value: 3 }
// { time: ISODate("2013-10-11T10:15:38Z"), value: 4 }
// { time: ISODate("2013-10-12T05:15:38Z"), value: 5 }
db.collection.aggregate([

  { $group: {
    _id: { $dateToString: { format: "%Y-%m-%d", date: "$time" } },
    total: { $sum: "$value" }
  }},
  // e.g.: { "_id" : "2013-10-11", "total" : 7 }

  { $set: { "date": "$_id" } }, { $unset: ["_id"] },
  // e.g.: { "date" : "2013-10-11", "total" : 7 }

  { $setWindowFields: {
    sortBy: { date: 1 },
    output: {
      running: {
        $sum: "$total",
        window: { documents: [ "unbounded", "current" ] }
      }
    }
  }}
])
// { date: "2013-10-11", total: 7, running: 7 }
// { date: "2013-10-10", total: 3, running: 10 }
// { date: "2013-10-12", total: 5, running: 15 }

让我们关注以下$setWindowFields阶段:

  • 按日期按时间顺序$sort分组的文档:sortBy: { date: 1 }
  • running在每个文档中添加字段 ( output: { running: { ... }})
  • 这是$sums total( $sum: "$total")
  • 在指定的文档范围内 (the window)
    • 在我们的案例中,这是任何以前的文件:window: { documents: [ "unbounded", "current" ] } }
    • 根据含义定义,窗口是在第一个文档 ( ) 和当前文档 ( )[ "unbounded", "current" ]之间看到的所有文档。unboundedcurrent
于 2021-11-27T15:45:44.450 回答