1

这听起来像是Qor之类的库的一个非常典型的用例async,但我真的不知道什么是最好的方法。

我想导入一个 150 行的 CSV 文件(使用node-csv),并为每一行创建一个 mongo 文档。但是,流解析似乎比 'db inserts' 完成得更快,所以我遇到了回调被调用得太早的问题。

// importtest.mocha.js
[...]
importer.loadFromCsv (url, function(result) {
 result.length.should.be.equal (150); // nope, it's always around 41
}


// importer.js 
function loadFromCsv (url, callback){
  csv().from.stream(url)
    .on ('record', function(record, index){ 
       new Row({data: record}).save(function() {
         console.log ('saved a row to db');
       }); 
    })
    .on ('end', function() {
      callback (Row.find({})); // E parser finished, but probably not all Row.save()
    });
}

那么,请任何人给我一个提示,我可以如何使用 async/promises 来解决这个问题,以便当流解析/数据库插入是异步的时,最终回调将仅在所有插入完成后完成?

4

1 回答 1

1

当您插入许多记录时,您应该单独注意每条记录。这是一个未经测试的代码片段,您可以尝试调整。实际上,您创建了一个承诺列表,当所有承诺都得到解决时,传递给 then(fn) 的函数被触发。好吧,正如代码中提到的,您应该注意有错误的记录。请注意,传递给 then(fn) 的函数只有在所有承诺都得到解决(成功)时才会执行。为了向记录表明承诺错误,您应该使用 defer.reject() 而不是 def.resolve()。还为 onErrorFn 占位符传递一个函数。它类似于 SQL 事务。

下面是包含注释的代码:

var q = require('q');

function loadFromCsv (url, callback){
  // create an array holding all promises
  var csv_promises = [];

  csv().from.stream(url)
    .on ('record', function(record, index){ 
      // create new defer object, per row
      var row_defer = q.defer();

      // make sure, this function gets called, only after the row got saved
      new Row({data: record}).save(function() {
        console.log ('saved a row to db');

        // resolves the promise, per row
        row_defer.resolve(record);

        // todo: take care for an error, per row
      });

      csv_promises.push(row_defer.promise);     // add promise to promise list, per row
    })
    .on ('end', function() {
      // callback (Row.find({})); // E parser finished, but probably not all Row.save()

      // q.all gets resolved and fires passed function as soon as ALL promises in csv_promises array are resolved
      // todo: take care for errors
      q.all(csv_promises).then(function() {
        callback( csv_promises );
      } /*, onErrorFn */ );
}

loadFromCsv( "URL", function(rows) {
    console.log("Treated rows: ", rows.length);
});
于 2013-12-14T11:55:31.560 回答