据我所知,变更流不支持此功能,因此您需要手动编写。类似以下的内容应该首先从数据库中获取最后 100 个文档并记录这些文档,然后再开始记录更改流看到的文档。
const conn = await mongodb.connect("mongodb://mongo:27017")
const db = conn.db("test");
const coll = db.collection(collName);
// We want to start watching before doing the fetch of the last 100 docs
const changeStream = coll.watch();
let changeStreamDocs = [];
let hasLoggedInitialDocs = false;
// we need to make sure we don't log the same doc twice if we see if in the change stream & in the `find`
const initialDocIds = {};
function logChangeStreamDocs() {
// We need to wait for `coll.find` to complete before we can start logging the change stream docs
if (!hasLoggedInitialDocs) return;
for (const doc of changeStreamDocs) {
if (initialDocIds[doc._id]) continue;
console.log(doc);
}
changeStreamDocs = [];
}
changeStream.on('change', next => {
changeStreamDocs.push(next.fullDocument)
logChangeStreamDocs();
});
// find the most recent 100 docs. _id: -1 is descending
const docs = await coll.find({}, { sort: { _id: -1 }, limit: 100 }).toArray();
for (const doc of docs) {
console.log(doc);
initialDocIds[doc._id] = true;
}
loggedInitialDocs = true;
logChangeStreamDocs();
注意:我手边没有副本集,所以我还没有实际测试过。里面可能有几个错别字