我在一些 csv 文件中有 200 万条数据......我正在获取这些文件并减少它以获得这样的结果总和......
{ "_id" : "08-08-2012 05:00", "value" : { "CollectionDate" : "08-08-2012 05:00", "count" : 20000 } }
{ "_id" : "08-08-2012 05:15", "value" : { "CollectionDate" : "08-08-2012 05:15", "count" : 20000 } }
{ "_id" : "08-08-2012 05:30", "value" : { "CollectionDate" : "08-08-2012 05:30", "count" : 20000 } }
{ "_id" : "08-08-2012 05:45", "value" : { "CollectionDate" : "08-08-2012 05:45", "count" : 20000 } }
{ "_id" : "08-08-2012 06:00", "value" : { "CollectionDate" : "08-08-2012 06:00", "count" : 20001 } }
{ "_id" : "08-08-2012 06:15", "value" : { "CollectionDate" : "08-08-2012 06:15", "count" : 20000 } }
{ "_id" : "08-08-2012 06:30", "value" : { "CollectionDate" : "08-08-2012 06:30", "count" : 20000 } }
....
{ "_id" : "08-08-2012 10:30", "value" : { "CollectionDate" : "08-08-2012 06:30", "count" : 20000 } }
{ "_id" : "08-08-2012 10:45", "value" : { "CollectionDate" : "08-08-2012 06:30", "count" : 20000 } }
{ "_id" : "08-08-2012 11:00", "value" : { "CollectionDate" : "08-08-2012 06:30", "count" : 20000 } }
获取此结果的源代码如下:
class DirectImport
{
private static bool forOnce = true;
static void Main(string[] args)
{
Stopwatch stopwatch = new Stopwatch();
bool reset = true;
BsonString lastAggregatedDate = "";
String lastCollName = "";
MongoServer mongo = MongoServer.Create();
mongo.Connect();
Console.WriteLine("Connected"); Console.WriteLine();
var db = mongo.GetDatabase("SampleData");
using (mongo.RequestStart(db))
{
List<BsonDocument> mDoc = new List<BsonDocument>();
IEnumerable<String> CollectionList = Directory.EnumerateFiles(@"c:\mongoDb\Final");
DateTime oldDt = new DateTime(1, 1, 1);
foreach (String coll in CollectionList.ToArray<String>())
{
String collName = coll.Substring(coll.LastIndexOf(@"\") + 1);
collName = collName.Substring(0, collName.LastIndexOf('.'));
//Console.WriteLine(collName);
string[] records = File.ReadAllLines(@"C:\mongoDB\Final\" + collName + ".csv");
mDoc = new List<BsonDocument>();
var newCollection = db.GetCollection<BsonDocument>(collName);
for (int l = 1; l < records.Length; l++)
{
string[] abc = records[l].Split(',');
//Console.WriteLine(abc[1] + "_" + abc[2] + "_" + abc[5]);
BsonDocument book1 = new BsonDocument();
book1.Add("_id", BsonString.Create(abc[1] + "_" + abc[2] + "_" + abc[5]));
book1.Add("Base_Name", BsonString.Create(abc[0]));
book1.Add("ObjectType", BsonString.Create(abc[1]));
book1.Add("C_ID", BsonString.Create(abc[2]));
book1.Add("AssociateDimension1", BsonString.Create(abc[3]));
book1.Add("AssociateDimension2", BsonInt32.Create(abc[4]));
book1.Add("CollectionDateHour", BsonInt32.Create(Convert.ToDateTime(BsonString.Create(abc[5])).Hour));
book1.Add("CollectionDate", BsonString.Create(abc[5]));
book1.Add("Granularity", BsonInt32.Create(abc[6]));
book1.Add("Counter1_1", BsonDouble.Create(abc[7]));
book1.Add("Counter1_2", BsonDouble.Create(abc[8]));
book1.Add("Counter1_3", BsonDouble.Create(abc[9]));
book1.Add("Counter1_4", BsonDouble.Create(abc[10]));
book1.Add("Counter1_5", BsonDouble.Create(abc[11]));
book1.Add("Counter1_6", BsonDouble.Create(abc[12]));
book1.Add("Counter1_7", BsonDouble.Create(abc[13]));
if (forOnce)
{
forOnce = false;
oldDt = Convert.ToDateTime(BsonString.Create(abc[5]));
lastCollName = collName;
}
DateTime dt = Convert.ToDateTime(BsonString.Create(abc[5]));
mDoc.Add(book1);
newCollection.Insert(book1);
String[] strparam = { "CollectionDate" };
if (!newCollection.IndexExists(strparam))
newCollection.CreateIndex(strparam);
if (oldDt > dt)
Console.WriteLine(dt + ":::" + oldDt);
if (((dt - oldDt).Hours >= 1) || oldDt > dt)
{
stopwatch.Start();
Console.WriteLine("Calling MAPReduce at: " + oldDt);
Console.WriteLine();
//var coll1 = db.GetCollection<BsonDocument>("MSS1");
//coll1.InsertBatch<BsonDocument>(mDoc);
MapReduce(db, lastCollName, dt, lastAggregatedDate, reset);
lastCollName = collName;
//coll1.Drop();
lastAggregatedDate = BsonString.Create(abc[5]);
reset = false;
mDoc = new List<BsonDocument>();
oldDt = dt;
TimeSpan ts = stopwatch.Elapsed;
Console.WriteLine("RunTime: " + ts.Hours + ":" + ts.Minutes + ":" + ts.Seconds + ":" + ts.Milliseconds / 10);
stopwatch.Stop();
}
}
}
}
Console.Read();
mongo.Disconnect();
}
private static void MapReduce(MongoDatabase db, String collName, BsonValue bsonValue, BsonString lastAggregateDate, bool reset)
{
var collection = db.GetCollection<BsonDocument>(collName);
Console.WriteLine(collName);
String map = @"function() {
emit(this.CollectionDate, {CollectionDate : this.CollectionDate, count: 1});
}";
String reduce = @"function(key, values) {
var result = {CollectionDate:key, count: 0};
values.forEach(function(value){
result.count += value.count;
});
return result;
}";
var options = new MapReduceOptionsBuilder();
IMongoQuery[] queries = { Query.GTE("CollectionDate", lastAggregateDate) };
//if(reset)
// queries = new IMongoQuery[] { Query.LT("CollectionDate", bsonValue) };
options.SetOutput(MapReduceOutput.Reduce("MSS_REDUCE"));
//options.SetOutput(MapReduceOutput.Reduce);
IMongoQuery query = Query.And(queries);
var results = collection.MapReduce(queries[0], map, reduce, options);
}
现在的问题是我需要的输出是:
{ "_id" : "08-08-2012 05:00", "value" : { "CollectionDate" : "08-08-2012 05:00", "count" : 20000 } }
{ "_id" : "08-08-2012 06:00", "value" : { "collectionDate" : "08-08-2012 06:00", "count" : 80000 } }
{ "_id" : "08-08-2012 07:00", "value" : { "collectionDate" : "08-08-2012 07:00", "count" : 80000 } }
{ "_id" : "08-08-2012 08:00", "value" : { "CollectionDate" : "08-08-2012 07:45", "count" : 80000 } }
{ "_id" : "08-08-2012 09:00", "value" : { "collectionDate" : "08-08-2012 09:00", "count" : 80000 } }
{ "_id" : "08-08-2012 10:00", "value" : { "collectionDate" : "08-08-2012 10:00", "count" : 80000 } }
{ "_id" : "08-08-2012 11:00", "value" : { "CollectionDate" : "08-08-2012 10:45", "count" : 80000 } }
现在我认为可以通过对_id键进行分片来完成..但是如何在c#中执行此操作以及如何实现此结果..请帮助我