3

我有许多事件文档,每个事件都有许多字段,但与我的查询相关的是:

  • person_id - 对触发事件的人的引用
  • event - 用于标识事件的字符串键
  • 发生时间 - 事件发生时间的 UTC

我想要实现的是:

  • 对于事件键的列表,例如 `['event_1','event_2', 'event_3']
  • 按顺序获取执行每个事件和该事件之前的所有事件的人数,即:
    • 执行 event_1 的人数
    • 执行 event_1,然后执行 event_2 的人数
    • 执行 event_1,然后 event_2,然后 event_3 的人数
    • ETC
  • 次要目标是能够获得每个事件的平均发生日期,以便我可以计算每个事件之间的平均时间

我得到的最好的是以下两个地图减少:

db.events.mapReduce(function () {
  emit(this.person_id, {
    e: [{
      e: this.event,
      o: this.occurred_at
    }]
  })
}, function (key, values) {
  return {
    e: [].concat.apply([], values.map(function (x) {
      return x.e
    }))
  }
}, {
  query: {
    account_id: ObjectId('52011239b1b9229f92000003'),
    event: {
      $in: ['event_a', 'event_b', 'event_c','event_d','event_e','event_f']
    }
  },
  out: 'people_funnel_chains',
  sort: { person_id: 1, occurred_at: 1 }
})

接着:

db.people_funnel_chains.mapReduce(function() {
  funnel = ['event_a', 'event_b', 'event_c','event_d','event_e','event_f']
  events = this.value.e;
  for (var e in funnel) {
    e = funnel[e];
    if ((i = events.map(function (x) {
      return x.e
    }).indexOf(e)) > -1) {
      emit(e, { c: 1, o: events[i].o })
      events = events.slice(i + 1, events.length);
    } else {
      break;
    }
  }
}, function(key,values) {
    return {
        c: Array.sum(values.map(function(x) { return x.c })),
        o: new Date(Array.sum(values.map(function(x) { return x.o.getTime() }))/values.length)
    };
}, { out: {inline: 1} })

我想使用聚合框架实时实现这一点,但看不到任何方法。对于成千上万条记录,这需要 10 秒,我可以增量运行它,这意味着它足够快以接收新数据,但是如果我想修改原始查询(例如更改事件链),它就无法完成在一个请求中,我希望它能够做到。

使用 Cursor.forEach() 更新

使用 Cursor.forEach() 我已经设法在这方面取得了巨大的进步(基本上消除了对第一个地图减少的要求)。

var time = new Date().getTime(), funnel_event_keys = ['event_a', 'event_b', 'event_c','event_d','event_e','event_f'], looking_for_i = 0, looking_for = funnel_event_keys[0], funnel = {}, last_person_id = null;
for (var i in funnel_event_keys) { funnel[funnel_event_keys[i]] = [0,null] };
db.events.find({
  account_id: ObjectId('52011239b1b9229f92000003'),
  event: {
    $in: funnel_event_keys
  }
}, { person_id: 1, event: 1, occurred_at: 1 }).sort({ person_id: 1, occurred_at: 1 }).forEach(function(e) {

  var current_person_id = e['person_id'].str; 

  if (last_person_id != current_person_id) {
    looking_for_i = 0;
    looking_for = funnel_event_keys[0]
  }

  if (e['event'] == looking_for) {
    var funnel_event = funnel[looking_for]
    funnel_event[0] = funnel_event[0] + 1;
    funnel_event[1] = ((funnel_event[1] || e['occurred_at'].getTime()) + e['occurred_at'].getTime())/2;
    looking_for_i = looking_for_i + 1;
    looking_for = funnel_event_keys[looking_for_i]
  }

  last_person_id = current_person_id;
})
funnel;
new Date().getTime() - time;

我想知道内存中数据的自定义是否能够对此进行改进?从 MongoDB 中获取成千上万条记录到内存中(在另一台机器上)将是一个瓶颈,是否有一种我不知道的技术可以做到这一点?

4

1 回答 1

3

我在我的 MongoDB 博客上写了一个完整的答案,但作为总结,您要做的是根据您关心的操作来规划您的操作,将操作字段的值映射到适当的键名,逐个人聚合三个操作他们什么时候做(以及多少次)然后投射新的字段来检查action2是否在action1之后完成,以及action3是否在action2之后完成......最后阶段只是总结了只做了1或1的人数然后是 2,或者是 1,然后是 2,然后是 3。

使用函数生成聚合管道,可以根据传入的操作数组生成结果。

在我的测试案例中,整个管道在 200 毫秒内运行,收集了 40,000 个文档(这是在我的小型笔记本电脑上)。

正如正确指出的那样,我描述的一般解决方案假设,虽然演员可以多次采取任何行动,但他们只能从行动 1 前进到行动 2,但他们不能直接从行动 1 跳到行动 3(将行动顺序解释为描述先决条件在您完成操作 2 之前,您无法执行操作 3)。

事实证明,聚合框架甚至可以用于顺序完全任意的事件序列,但您仍然想知道有多少人在某个时间点执行了序列 action1、action2、action3。

对原始答案的主要调整是在中间增加一个额外的两阶段步骤。此步骤展开收集的个人文档以重新组合它,以找到在第一个动作第一次出现之后出现的第二个动作的第一次出现。

一旦我们知道最终的比较是针对 action1,然后是最早出现的 action2,并将其与最新出现的 action3 进行比较。

它可能可以概括为处理任意数量的事件,但是每两个过去的额外事件都会为聚合增加两个阶段。

这是我对管道修改的文章,以实现您正在寻找的答案。

于 2013-10-28T05:55:54.283 回答