我从服务器获得了一些时间间隔为 10 分钟的记录(在 1 小时内我将获得 6 个文件)我想在接下来的几个小时内每 1 小时执行一次 map reduce 我将不得不在最后一个文件上执行下一组的 map reduce小时文件我将如何解决这个问题?帮助我在过去 1 个月里混淆了 frm 谢谢 Sushil Kr Singh
1 回答
为了按小时汇总 10 分钟的日志文件,您可以在 map 函数中将每个日志文件的时间戳四舍五入到最接近的小时,并在 reduce 函数中按小时对结果进行分组。
这是一个从 mongo shell 说明这一点的小虚拟示例:
创建 100 个日志文件,每间隔 10 分钟,包含一个 0-10 之间的随机数,并将它们插入到
logs
数据库中的集合中:for (var i = 0; i < 100; i++) { d = new ISODate(); d.setMinutes(d.getMinutes() + i*10); r = Math.floor(Math.random()*11) db.logs.insert({timestamp: d, number: r}) }
要检查
logs
集合的外观,请发送类似 的查询db.logs.find().limit(3).pretty()
,其结果是:{ "_id" : ObjectId("50455a3570537f9433c1efb2"), "timestamp" : ISODate("2012-09-04T01:32:37.370Z"), "number" : 2 } { "_id" : ObjectId("50455a3570537f9433c1efb3"), "timestamp" : ISODate("2012-09-04T01:42:37.370Z"), "number" : 3 } { "_id" : ObjectId("50455a3570537f9433c1efb4"), "timestamp" : ISODate("2012-09-04T01:52:37.370Z"), "number" : 8 }
定义一个映射函数(在本例中称为
mapf
),将时间戳四舍五入到最接近的小时(向下舍入),用于发出键。发出值是该日志文件的编号。mapf = function () { // round down to nearest hour d = this.timestamp; d.setMinutes(0); d.setSeconds(0); d.setMilliseconds(0); emit(d, this.number); }
定义一个 reduce 函数,对所有发出的值(即数字)求和。
reducef = function (key, values) { var sum = 0; for (var v in values) { sum += values[v]; } return sum; }
现在对日志集合执行 map/reduce。这里的
out
参数指定我们要将结果写入hourly_logs
集合并将现有文档与新结果合并。这确保了稍后提交的日志文件(例如,在服务器故障或其他延迟之后)一旦出现在日志中就会包含在结果中。db.logs.mapReduce(mapf, reducef, {out: { merge : "hourly_logs" }})
最后,要查看结果,您可以查询一个简单的 find
hourly_logs
:db.hourly_logs.find() { "_id" : ISODate("2012-09-04T02:00:00Z"), "value" : 33 } { "_id" : ISODate("2012-09-04T03:00:00Z"), "value" : 31 } { "_id" : ISODate("2012-09-04T04:00:00Z"), "value" : 21 } { "_id" : ISODate("2012-09-04T05:00:00Z"), "value" : 40 } { "_id" : ISODate("2012-09-04T06:00:00Z"), "value" : 26 } { "_id" : ISODate("2012-09-04T07:00:00Z"), "value" : 26 } { "_id" : ISODate("2012-09-04T08:00:00Z"), "value" : 25 } { "_id" : ISODate("2012-09-04T09:00:00Z"), "value" : 46 } { "_id" : ISODate("2012-09-04T10:00:00Z"), "value" : 27 } { "_id" : ISODate("2012-09-04T11:00:00Z"), "value" : 42 } { "_id" : ISODate("2012-09-04T12:00:00Z"), "value" : 43 } { "_id" : ISODate("2012-09-04T13:00:00Z"), "value" : 35 } { "_id" : ISODate("2012-09-04T14:00:00Z"), "value" : 22 } { "_id" : ISODate("2012-09-04T15:00:00Z"), "value" : 34 } { "_id" : ISODate("2012-09-04T16:00:00Z"), "value" : 18 } { "_id" : ISODate("2012-09-04T01:00:00Z"), "value" : 13 } { "_id" : ISODate("2012-09-04T17:00:00Z"), "value" : 25 } { "_id" : ISODate("2012-09-04T18:00:00Z"), "value" : 7 }
结果是 10 分钟日志的每小时摘要,其中 _id 字段包含小时的开始,值字段包含随机数的总和。在您的情况下,您可能有不同的聚合运算符;根据您的需要修改reduce函数。
正如 Sammaye 在评论中提到的,您可以使用 cron 作业条目自动执行 map/reduce 调用,以每小时运行一次。
如果您不想每次都处理完整的日志集合,您可以通过将文档限制为每小时时间窗口来运行增量更新,如下所示:
var q = { $and: [ {timestamp: {$gte: new Date(2012, 8, 4, 12, 0, 0) }},
{timestamp: {$lt: new Date(2012, 8, 4, 13, 0, 0) }} ] }
db.logs.mapReduce(mapf, reducef, {query: q, out: { merge : "hourly_logs" }})
这将仅包括 12 到 13 小时之间的日志文件。请注意,Date() 对象中的月份值从 0 开始(8 = 九月)。由于该merge
选项,在已处理的日志文件上运行 m/r 是安全的。