我想使用 MongoDB 更改流来监视要填充的第一个集合的插入/更新,当满足条件时,另一个具有从监视集合中提取的计算值的集合。
按照 Mongodb教程,我得出以下结果:
require('dotenv').config();
const { MongoClient } = require('mongodb');
const stream = require('stream');
const es = require('event-stream');
async function monitorListingsUsingStreamAPI(client, pipeline = []) {
const collection = client
.db(process.env.MONGO_DB)
.collection(process.env.COLLECTION_TO_MONITOR);
const changeStream = collection.watch(pipeline);
const collection_dest = client
.db(process.env.MONGO_DB)
.collection(process.env.COLLECTION_TO_POPULATE);
changeStream.pipe(
es.map(function (doc, next) {
const { _id, ...data } = doc.fullDocument;
const new_doc = { size: data.samples.length, data };
(async () => {
await collection_dest.insertOne(new_doc, next);
})();
}),
);
}
async function main() {
const uri = process.env.MONGO_DB_URI;
const client = new MongoClient(uri, {
useUnifiedTopology: true,
useNewUrlParser: true,
});
try {
// Connect to the MongoDB cluster
await client.connect();
const pipeline = [
{
$match: {
operationType: 'insert',
'fullDocument.samples': { $size: 3 },
},
},
];
// Monitor new listings using the Stream API
await monitorListingsUsingStreamAPI(client, pipeline);
}
}
实际上它似乎有效,但我曾经使用event-stream
MongoDBpipe
将流更改为另一个流,在该流中我使用立即调用的匿名异步函数来填充第二个集合。
我想知道这种方法是否正确?如何使用转换流?