0

如何在 MapReduce MongoDB 中编写 sql avg 函数?我尝试了以下方法,将值相加并除以计数。但问题是我在哪里做,在减少功能或完成功能?

例如:我有以下文件

{
  "_id" : ObjectId("511b7d1b3daee1b1446ecdfe"),
  "l_linenumber" : 1,
  "l_quantity" : 17,
  "l_extendedprice" : 21168.23,
  "l_discount" : 0.04,
  "l_tax" : 0.02,
  "l_returnflag" : "N",
  "l_linestatus" : "O",
  "l_shipdate" : ISODate("1996-03-13T03:00:00Z"),
  "l_commitdate" : ISODate("1996-02-12T03:00:00Z"),
  "l_receiptdate" : ISODate("1996-03-22T03:00:00Z"),
  "l_shipinstruct" : "DELIVER IN PERSON",
  "l_shipmode" : "TRUCK",
  "l_comment" : "blithely regular ideas caj",
}

SQL查询是:

select
    l_returnflag, 
    l_linestatus, 
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice*(1-l_discount)) as sum_disc_price,
    sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty, 
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc, 
    count(*) as count_order
from 
    lineitem
where 
    l_shipdate <= DATE_SUB('1998-12-01',INTERVAL 90 DAY)
group by 
    l_returnflag, 
    l_linestatus
order by 
    l_returnflag, 
    l_linestatus;

我做了这个mapreduce函数:

db.runCommand({
    mapreduce: "lineitem",
    map: function Map() {
            var dataInicial = new Date("Dec 1, 1998");
            var dataFinal = new Date();
            dataFinal.setDate(dataInicial.getDate()-90);
                if( this.l_shipdate<=dataFinal) {
                    emit(
                        {returnflag: this.l_returnflag, linestatus: this.l_linestatus}, 
                        {
                            sum_qty: this.l_quantity,
                            sum_base_price: this.l_extendedprice,
                            sum_disc_price: this.l_extendedprice*(1-this.l_discount),
                            sum_charge: this.l_extendedprice*(1-this.l_discount)*(1+this.l_tax),
                            avg_qty: this.l_quantity,
                            avg_price: this.l_extendedprice,
                            avg_disc: this.l_discount,
                            count_order: 1
                        }
                    );
                }
        },
    reduce: function(key, values) {
                var ret = {sum_qty: 0, sum_base_price: 0, sum_disc_price: 0, sum_charge: 0, 
                            avg_qty: 0, avg_price: 0, avg_disc: 0, count_order: 0};
                for (var i = 0; i < values.length; i++) {
                    ret.sum_qty += values[i].sum_qty;
                    ret.sum_base_price += values[i].sum_base_price;
                    ret.sum_disc_price += values[i].sum_disc_price;
                    ret.sum_charge += values[i].sum_charge;
                    ret.avg_qty += values[i].avg_qty;
                    ret.avg_price += values[i].avg_price;
                    ret.avg_disc += values[i].avg_disc;
                    ret.count_order += values[i].count_order;
                }
                return ret;
            },
finalize: function(key, value) {
                value.avg_qty = value.avg_qty/value.count_order;
                value.avg_price = value.avg_qty/value.count_order;
                value.avg_disc = value.avg_qty/value.count_order;
            return value;
        },
    out: 'query001'
});

avg_qty、avg_price、avg_disc 的答案不正确。怎么回事?或者将在reduce函数中进行求和和除数?

4

1 回答 1

0

以下是使用 MapReduce 的方法:

m = function (){
  emit( {returnflag: this.l_returnflag, linestatus: this.l_linestatus} , 
        { sum_base_price: this.l_extendedprice, count : 1 } );
};

r = function (name, values){
  var res = {sum_base_price : 0, count : 0};
  values.forEach (function (v) {
    res.sum_base_price += v[i].sum_base_price;
    res.count += v[i].count;
  }
  return res;
};

f = function(key, res){
  res.avg = res.sum_base_price / res.count;
  return res;
};

然后,调用 mapReduce:

db.lineitem.mapReduce( m, r, 
                   { finalize : f, 
                     out : {inline : 1}, 
                     query: {l_shipdate:{$lt:dataFinal}} 
                   }
);

因此,您不是在 map 函数中进行过滤,而是在调用 map 之前在查询中进行过滤(这样更有效)。

在聚合框架中,它将是:

db.lineitem.aggregate( [
   {$match: {l_shipdate:{$lt:dataFinal}},
   {$group: { _id: {returnflag: "$l_returnflag", linestatus: "$l_linestatus"},
              sum_base_price: {$sum:"$l_extended_price"},
              avg_base_price: {$avg:"$l_extended_price"},
              count: {$sum: 1}
            }
   }
])

根据需要添加其他字段...

于 2013-03-01T12:40:05.820 回答