5

我有一个通过侦听dataerrorend事件来处理的流,我调用一个函数来处理data第一个流中的每个事件。自然,处理数据的函数调用其他回调,使其异步。那么在处理流中的数据时如何开始执行更多代码呢?监听end流中的事件并不意味着异步data处理功能已经完成。

如何确保在执行下一条语句时完成流数据处理功能?

这是一个例子:

function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) {
  var self = this;
  var promises = [];
  accountStream
    .on('data', function (account) {
      migrateAccount.bind(self)(account, finishMigration);
    })
    .on('error', function (err) {
      return console.log(err);
    })
    .on('end', function () {
      console.log("Finished updating account stream (but finishMigration is still running!!!)");
      callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running!
    });
}

var migrateAccount = function (oldAccount, callback) {
  executeSomeAction(oldAccount, function(err, newAccount) {
    if (err) return console.log("error received:", err);
    return callback(newAccount);
  });
}

var finishMigration = function (newAccount) {
  // some code that is executed asynchronously...
}

如何确保callThisOnlyAfterAllAccountsAreMigrated在处理流之后调用它?

这可以通过承诺来完成吗?可以通过流来完成吗?我正在使用 Nodejs,因此引用其他 npm 模块可能会有所帮助。

4

3 回答 3

2

正如你所说,在流上监听end事件本身是没有用的。流不知道也不关心您对data处理程序中的数据做了什么,因此您需要编写一些代码来跟踪您自己的 migrateAccount 状态。

如果是我,我会重写整个部分。如果您在流中使用readable事件.read(),您可以一次读取任意数量的项目,只要您想处理。如果是一个,没问题。如果是30,那就太好了。这样做的原因是,您不会超出处理来自流的数据的任何工作。就目前而言,如果 accountStream 速度很快,您的应用程序无疑会在某个时候崩溃。

当您从流中读取一个项目并开始工作时,接受您返回的承诺(使用 Bluebird 或类似的)并将其放入数组中。当 promise 解决后,将它从数组中移除。当流结束时,附加一个.done()处理程序.all()(基本上从仍然在数组中的每个承诺中做出一个大承诺)。

您还可以对正在进行的工作使用简单的计数器。

于 2015-05-26T18:50:30.100 回答
1

使用直通流(npm through2模块),我使用以下控制异步行为的代码解决了这个问题:

var through = require('through2').obj;
function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) {
  var self = this;
  var promises = [];
  accountStream.pipe(through(function(account, _, next) {
    migrateAccount.bind(self)(account, finishMigration, next);
  }))
    .on('data', function (account) {
    })
    .on('error', function (err) {
      return console.log(err);
    })
    .on('end', function () {
      console.log("Finished updating account stream");
      callThisOnlyAfterAllAccountsAreMigrated();
    });
}

var migrateAccount = function (oldAccount, callback, next) {
  executeSomeAction(oldAccount, function(err, newAccount) {
    if (err) return console.log("error received:", err);
    return callback(newAccount, next);
  });
}

var finishMigration = function (newAccount, next) {
  // some code that is executed asynchronously, but using 'next' callback when migration is finished...
}
于 2015-05-27T05:18:21.093 回答
1

通过 Promise 处理流时会容易得多。

这里复制,一个使用spex库的示例:

var spex = require('spex')(Promise);
var fs = require('fs');

var rs = fs.createReadStream('values.txt');

function receiver(index, data, delay) {
    return new Promise(function (resolve) {
        console.log("RECEIVED:", index, data, delay);
        resolve(); // ok to read the next data;
    });
}

spex.stream.read(rs, receiver)
    .then(function (data) {
        // streaming successfully finished;
        console.log("DATA:", data);
    }, function (reason) {
        // streaming has failed;
        console.log("REASON:", reason);
    });
于 2015-10-20T00:42:02.587 回答