68

使用 mongoskin,我可以做这样的查询,它会返回一个游标:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

      }
}

但是,我想为每个文档调用一些异步函数,并且在回调之后才移动到光标上的下一项(类似于 async.js 模块中的 eachSeries 结构)。例如:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

            externalAsyncFunction(result, function(err) {
               //externalAsyncFunction completed - now want to move to next doc
            });

      }
}  

我怎么能这样做?

谢谢

更新:

我不想使用toArray(),因为这是一个大批量操作,结果可能一次不适合内存。

4

9 回答 9

83

使用async/的更现代的方法await

const cursor = db.collection("foo").find({});
while(await cursor.hasNext()) {
  const doc = await cursor.next();
  // process doc here
}

笔记:

  • 当异步迭代器到达时,这可能会简单。
  • 您可能需要添加 try/catch 以进行错误检查。
  • 包含函数应该是async或者代码应该被包裹,(async function() { ... })()因为它使用await.
  • 如果需要,可以await new Promise(resolve => setTimeout(resolve, 1000));在 while 循环的末尾添加(暂停 1 秒)以表明它确实一个接一个地处理文档。
于 2017-07-05T06:03:51.273 回答
50

如果您不想使用 toArray 将所有结果加载到内存中,则可以使用光标进行迭代,如下所示。

myCollection.find({}, function(err, resultCursor) {
  function processItem(err, item) {
    if(item === null) {
      return; // All done!
    }

    externalAsyncFunction(item, function(err) {
      resultCursor.nextObject(processItem);
    });

  }

  resultCursor.nextObject(processItem);
}  
于 2013-08-08T07:00:09.297 回答
29

从 node.js v10.3 开始,您可以使用异步迭代器

const cursor = db.collection('foo').find({});
for await (const doc of cursor) {
  // do your thing
  // you can even use `await myAsyncOperation()` here
}

Jake Archibald 写了一篇关于异步迭代器的精彩博客文章,我在阅读@user993683 的回答后才知道。

于 2019-06-16T14:51:52.560 回答
11

这适用于使用 setImmediate 处理大型数据集:

var cursor = collection.find({filter...}).cursor();

cursor.nextObject(function fn(err, item) {
    if (err || !item) return;

    setImmediate(fnAction, item, arg1, arg2, function() {
        cursor.nextObject(fn);
    });
});

function fnAction(item, arg1, arg2, callback) {
    // Here you can do whatever you want to do with your item.
    return callback();
}
于 2016-08-17T13:16:36.467 回答
4

如果有人正在寻找一种 Promise 方式来执行此操作(而不是使用 nextObject 的回调),那么就是这样。我正在使用 Node v4.2.2 和 mongo 驱动程序 v2.1.7。这是一种 asyncSeries 版本Cursor.forEach()

function forEachSeries(cursor, iterator) {
  return new Promise(function(resolve, reject) {
    var count = 0;
    function processDoc(doc) {
      if (doc != null) {
        count++;
        return iterator(doc).then(function() {
          return cursor.next().then(processDoc);
        });
      } else {
        resolve(count);
      }
    }
    cursor.next().then(processDoc);
  });
}

要使用它,请传递光标和一个异步操作每个文档的迭代器(就像您对 Cursor.forEach 所做的那样)。迭代器需要返回一个承诺,就像大多数 mongodb 原生驱动程序函数一样。

比如说,您想更新集合中的所有文档test。你会这样做:

var theDb;
MongoClient.connect(dbUrl).then(function(db) {
  theDb = db;     // save it, we'll need to close the connection when done.
  var cur = db.collection('test').find();

  return forEachSeries(cur, function(doc) {    // this is the iterator
    return db.collection('test').updateOne(
      {_id: doc._id},
      {$set: {updated: true}}       // or whatever else you need to change
    );
    // updateOne returns a promise, if not supplied a callback. Just return it.
  });
})
.then(function(count) {
  console.log("All Done. Processed", count, "records");
  theDb.close();
})
于 2016-06-18T15:50:03.647 回答
2

您可以使用异步库执行类似的操作。这里的关键是检查当前文档是否为空。如果是,则表示您已完成。

async.series([
        function (cb) {
            cursor.each(function (err, doc) {
                if (err) {
                    cb(err);
                } else if (doc === null) {
                    cb();
                } else {
                    console.log(doc);
                    array.push(doc);
                }
            });
        }
    ], function (err) {
        callback(err, array);
    });
于 2014-01-08T22:56:24.227 回答
0

你可以使用未来:

myCollection.find({}, function(err, resultCursor) {
    resultCursor.count(Meteor.bindEnvironment(function(err,count){
        for(var i=0;i<count;i++)
        {
            var itemFuture=new Future();

            resultCursor.nextObject(function(err,item)){
                itemFuture.result(item);
            }

            var item=itemFuture.wait();
            //do what you want with the item, 
            //and continue with the loop if so

        }
    }));
});
于 2014-12-23T12:04:10.243 回答
0

You can get the result in an Array and iterate using a recursive function, something like this.

myCollection.find({}).toArray(function (err, items) {
    var count = items.length;
    var fn = function () {
        externalAsyncFuntion(items[count], function () {
            count -= 1;
            if (count) fn();
        })
    }

    fn();
});

Edit:

This is only applicable for small datasets, for larger one's you should use cursors as mentioned in other answers.

于 2013-08-08T06:57:18.620 回答
-2

您可以使用简单的 setTimeOut。这是在 nodejs 上运行的 typescript 中的一个示例(我通过 'when' 模块使用 Promise,但也可以在没有它们的情况下完成):

        import mongodb = require("mongodb");

        var dbServer = new mongodb.Server('localhost', 27017, {auto_reconnect: true}, {});
        var db =  new mongodb.Db('myDb', dbServer);

        var util = require('util');
        var when = require('when'); //npm install when

        var dbDefer = when.defer();
        db.open(function() {
            console.log('db opened...');
            dbDefer.resolve(db);
        });

        dbDefer.promise.then(function(db : mongodb.Db){
            db.collection('myCollection', function (error, dataCol){
                if(error) {
                    console.error(error); return;
                }

                var doneReading = when.defer();

                var processOneRecordAsync = function(record) : When.Promise{
                    var result = when.defer();

                    setTimeout (function() {
                        //simulate a variable-length operation
                        console.log(util.inspect(record));
                        result.resolve('record processed');
                    }, Math.random()*5);

                    return result.promise;
                }

                var runCursor = function (cursor : MongoCursor){
                    cursor.next(function(error : any, record : any){
                        if (error){
                            console.log('an error occurred: ' + error);
                            return;
                        }
                        if (record){
                            processOneRecordAsync(record).then(function(r){
                                setTimeout(function() {runCursor(cursor)}, 1);
                            });
                        }
                        else{
                            //cursor up
                            doneReading.resolve('done reading data.');
                        }
                    });
                }

                dataCol.find({}, function(error, cursor : MongoCursor){
                    if (!error)
                    {
                        setTimeout(function() {runCursor(cursor)}, 1);
                    }
                });

                doneReading.promise.then(function(message : string){
                    //message='done reading data'
                    console.log(message);
                });
            });
        });
于 2014-03-19T22:51:19.583 回答