1

我有以下格式的数据..

Type C_ID Assitor CollectionDate 粒度计数器

10 08-08-2012 00:00 15 0.9912378

乙 5 08-08-2012 00:00 15 0.1860929

C b 4 08-08-2012 00:00 15 0.5345317

直流 1 08-08-2012 00:15 15 0.8656529

E b 1 08-08-2012 00:15 15 0.3249502

10 08-08-2012 00:15 15 0.3743117

乙 5 08-08-2012 00:30 15 0.2608622

C b 4 08-08-2012 00:30 15 0.0079308

直流 1 08-08-2012 00:30 15 0.7094781

E b 1 08-08-2012 00:45 15 0.6133461

10 08-08-2012 00:45 15 0.3035875

乙 5 08-08-2012 00:45 15 0.6093015

C b 4 08-08-2012 01:00 15 0.4104008

直流 1 08-08-2012 01:00 15 0.1687753

E b 1 08-08-2012 01:00 15 0.6627076

一个 10 08-08-2012 01:15 15 0.1901386 .....

等等..

我想每小时在这张表上做增量mapreduce。在CollectionDate上是显示记录何时到来的字段。但是我想要的所有这些代码都在c#.net中

我已经完成了 mapReduce,但问题是我每 15 分钟获得 3 条记录,每小时获得 12 条记录,1 小时后,这 15 条记录将减少..在接下来的 1 小时后,其余记录将在相同的基础上减少..

我只能在 c#.net 中获得这方面的帮助吗?自过去 20 天以来,我遇到了很大的麻烦。

它多余的csv文件..我从哪里得到记录..n使用c#插入mongodb..在mongodb中它看起来像这样:{“_id”:a324b2f89d2e98fa21f,“Type”:A,“C_ID”:a, “assitor”:10,“CollectionDate”:08-08-2012 00:00,“Granulity”:15,“Counter”:0.1901386 } {“_id”:a324b2f89d2e98a216f,“Type”:B,“C_ID”:a, “assitor”:10,“CollectionDate”:08-08-2012 00:00,“Granulity”:15,“Counter”:0.1233542 } {“_id”:a324b2f89d2e98a3f2c,“Type”:A,“C_ID”:b, “assitor”:12,“CollectionDate”:08-08-2012 00:15,“Granulity”:12,“Counter”:0.8134552 } {“_id”:a324b2f89d2e98b4e2d,“Type”:B,“C_ID”:b,“assitor”:12,“CollectionDate”:08-08-2012 00:15,“Granulity”:12,“Counter”:0.3218547 }

输出文件:{“_id”:a8f3e231d456a675b23c,“CollectionDate”:08-08-2012 00:00 “AvgCounter”:} {“_id”:a8f3e232456a675a42cd,“CollectionDate”:08-08-2012 01:00 “AvgCounter” {“_id”:a8f3e231d46a67a0b4d2,“CollectionDate”:08-08-2012 02:00“AvgCounter”:}

表示每小时汇总..

直到我做了什么...

private static void MapReduce(MongoDatabase db, String collName, BsonValue bsonValue, DateTime oldDateTime, DateTime newDateTime)
    {
        var collection = db.GetCollection<BsonDocument>(collName);
        Console.WriteLine(TotalReduction++); 
        String map = @"function() { 
                                var sample = this;
                                emit(sample.CollectionDate, {CID: sample.C_ID, count:1, CollectionTime: sample.CollectionDate});
                             }";
        String reduce = @"function(key, values) {
                                var result = {CID: '', count:0};
                                values.forEach(function(value){
                                    result.CID += value.CID;
                                    result.count += value.count;
                                    result.CollectionTime = value.CollectionTime;
                                });
                                return result;
                                }";
        var options = new MapReduceOptionsBuilder();
        IMongoQuery[] queries = { Query.EQ("CollectionTime", bsonValue) };
        options.SetOutput(MapReduceOutput.Inline);
        IMongoQuery query = Query.And(queries);
        var results = collection.MapReduce(queries[0], map, reduce);
        collection = db.GetCollection<BsonDocument>("MSS_REDUCE");
        IEnumerable<BsonDocument> bdoc = results.GetResultsAs<BsonDocument>();
        collection.InsertBatch<BsonDocument>(bdoc);
    }

谢谢拉维夏尔马

4

1 回答 1

0

所以,我还是有点不清楚你的数据集。我可以指出一件事,希望它会有所帮助......

在您的地图中,您不会发出要分组的实际日期,而是发出包含所有内容的日期。相反,您应该发出一个删除了分钟、秒和毫秒的密钥。此外,如果您打算按 _id 进行分组,您也需要发出它。

String map = @"function() { 
    var sample = this;
    var d = sample.CollectionDate;
    var newCollectionDate = new Date(d.getFullYear(), d.getMonth(), d.getDate(), d.getHours(), 0, 0, 0);
    emit({C_ID: sample.C_ID, CollectionDate: newCollectionDate}, {C_ID: sample.C_ID, Count: 1, CounterSum: sample.Counter, CounterAverage: 0, CollectionDate: newCollectionDate});
}";

然后,您的 reduce 函数需要跟踪值的计数和总和。

String reduce = @"function(key, values) {
    var result = {C_ID: key.C_ID, Count:0, CounterSum: 0, CounterAverage: 0, CollectionDate: key.CollectionDate};
    values.forEach(function(value){
        result.Count += value.Count;
        result.CounterSum += value.CounterSum;
    });
    return result;
}";

您还需要一个 finalize 方法来进行平均...

String finalize = @"function(key, value) {
    if(value.Count > 0) {
        value.CounterAverage = value.CounterSum / value.Count;
    }
    return value;
}";

希望这能让你到达你需要去的地方。

于 2012-09-05T22:16:58.470 回答