2

我熟悉 MongoDB oplog 和 change-streams,如下所示:

  const changeStream = d.db(dbName).collection(dbCollName).watch();

  changeStream.on('change', next => {
    log.info('doc:', next.fullDocument);
  });

但我的问题是,有没有办法获取最新的 100 个文档,然后阅读更改而不会丢失和文档?就像tail -f但是对于 mongodb 上限集合?

4

1 回答 1

0

据我所知,变更流不支持此功能,因此您需要手动编写。类似以下的内容应该首先从数据库中获取最后 100 个文档并记录这些文档,然后再开始记录更改流看到的文档。

  const conn = await mongodb.connect("mongodb://mongo:27017")
  const db = conn.db("test");
  const coll = db.collection(collName);
  // We want to start watching before doing the fetch of the last 100 docs
  const changeStream = coll.watch();
  let changeStreamDocs = [];
  let hasLoggedInitialDocs = false;
  // we need to make sure we don't log the same doc twice if we see if in the change stream & in the `find`
  const initialDocIds = {};
  function logChangeStreamDocs() {
    // We need to wait for `coll.find` to complete before we can start logging the change stream docs
    if (!hasLoggedInitialDocs) return;
    for (const doc of changeStreamDocs) {
      if (initialDocIds[doc._id]) continue;
      console.log(doc);
    }
    changeStreamDocs = [];
  }
  changeStream.on('change', next => {
    changeStreamDocs.push(next.fullDocument)
    logChangeStreamDocs();
  });
  // find the most recent 100 docs. _id: -1 is descending
  const docs = await coll.find({}, { sort: { _id: -1 }, limit: 100 }).toArray();
  for (const doc of docs) {
    console.log(doc);
    initialDocIds[doc._id] = true;
  }
  loggedInitialDocs = true;
  logChangeStreamDocs();

注意:我手边没有副本集,所以我还没有实际测试过。里面可能有几个错别字

于 2019-10-04T16:25:20.130 回答