1

我想使用 MongoDB 更改流来监视要填充的第一个集合的插入/更新,当满足条件时,另一个具有从监视集合中提取的计算值的集合。
按照 Mongodb教程,我得出以下结果:

require('dotenv').config();
const { MongoClient } = require('mongodb');
const stream = require('stream');
const es = require('event-stream');

async function monitorListingsUsingStreamAPI(client, pipeline = []) {
  const collection = client
    .db(process.env.MONGO_DB)
    .collection(process.env.COLLECTION_TO_MONITOR);

  
  const changeStream = collection.watch(pipeline);
  const collection_dest = client
    .db(process.env.MONGO_DB)
    .collection(process.env.COLLECTION_TO_POPULATE);   

  changeStream.pipe(
    es.map(function (doc, next) {
      const { _id, ...data } = doc.fullDocument;
      const new_doc = { size: data.samples.length, data };
      (async () => {
        await collection_dest.insertOne(new_doc, next);
      })();
    }),
  );
}

async function main() {
 
  const uri = process.env.MONGO_DB_URI;
  const client = new MongoClient(uri, {
    useUnifiedTopology: true,
    useNewUrlParser: true,
  });

  try {
    // Connect to the MongoDB cluster
    await client.connect();
    const pipeline = [
      {
        $match: {
          operationType: 'insert',
          'fullDocument.samples': { $size: 3 },
        },
      },
    ];

    //  Monitor new listings using the Stream API
    await monitorListingsUsingStreamAPI(client, pipeline);
  } 
}

实际上它似乎有效,但我曾经使用event-streamMongoDBpipe将流更改为另一个流,在该流中我使用立即调用的匿名异步函数来填充第二个集合。

我想知道这种方法是否正确?如何使用转换流?

4

0 回答 0