79

node.js 的事件驱动编程模型使得协调程序流有些棘手。

简单的顺序执行变成了嵌套回调,这很容易(尽管写下来有点复杂)。

但是并行执行呢?假设您有三个可以并行运行的任务 A、B、C,当它们完成后,您希望将它们的结果发送给任务 D。

使用 fork/join 模型,这将是

  • 前叉
  • 前叉 B
  • 叉子 C
  • 加入A,B,C,运行D

我如何在 node.js 中编写它?有没有最佳实践或食谱?我是否每次都必须手动推出解决方案,或者是否有一些图书馆为此提供帮助?

4

7 回答 7

128

node.js 中没有什么是真正并行的,因为它是单线程的。但是,可以安排多个事件并按照您事先无法确定的顺序运行。像数据库访问这样的事情实际上是“并行的”,因为数据库查询本身在单独的线程中运行,但在完成时会重新集成到事件流中。

那么,如何在多个事件处理程序上安排回调呢?好吧,这是浏览器端 javascript 动画中使用的一种常用技术:使用变量来跟踪完成情况。

这听起来像是一种 hack,而且确实如此,而且它可能会留下一堆全局变量来进行跟踪,并且用一种较少的语言来进行跟踪。但是在javascript中我们可以使用闭包:

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var callback = function () {
    counter --;
    if (counter == 0) {
      shared_callback()
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](callback);
  }
}

// usage:
fork([A,B,C],D);

在上面的示例中,我们通过假设异步和回调函数不需要参数来保持代码简单。您当然可以修改代码以将参数传递给异步函数,并让回调函数累积结果并将其传递给 shared_callback 函数。


补充回答:

实际上,即使是这样,该fork()函数也可以使用闭包将参数传递给异步函数:

fork([
  function(callback){ A(1,2,callback) },
  function(callback){ B(1,callback) },
  function(callback){ C(1,2,callback) }
],D);

剩下要做的就是累积 A、B、C 的结果并将它们传递给 D。


更多附加答案:

我无法抗拒。早餐的时候一直在想这个。这是一个fork()累积结果的实现(通常作为参数传递给回调函数):

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var all_results = [];
  function makeCallback (index) {
    return function () {
      counter --;
      var results = [];
      // we use the arguments object here because some callbacks 
      // in Node pass in multiple arguments as result.
      for (var i=0;i<arguments.length;i++) {
        results.push(arguments[i]);
      }
      all_results[index] = results;
      if (counter == 0) {
        shared_callback(all_results);
      }
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](makeCallback(i));
  }
}

这很容易。这fork()具有相当通用的用途,可用于同步多个非同质事件。

Node.js 中的示例用法:

// Read 3 files in parallel and process them together:

function A (c){ fs.readFile('file1',c) };
function B (c){ fs.readFile('file2',c) };
function C (c){ fs.readFile('file3',c) };
function D (result) {
  file1data = result[0][1];
  file2data = result[1][1];
  file3data = result[2][1];

  // process the files together here
}

fork([A,B,C],D);

更新

这段代码是在 async.js 等库或各种基于 Promise 的库出现之前编写的。我想相信 async.js 是受此启发的,但我没有任何证据。无论如何..如果你今天打算这样做,请查看 async.js 或 promises。只需考虑上面的答案即可很好地解释/说明诸如 async.parallel 之类的工作方式。

为了完整起见,以下是您的操作方式async.parallel

var async = require('async');

async.parallel([A,B,C],D);

请注意,它async.parallel的工作方式与我们上面实现的功能完全相同fork。主要区别在于它根据 node.js 约定将错误作为第一个参数传递给D,并将回调作为第二个参数传递。

使用 Promise,我们可以这样写:

// Assuming A, B & C return a promise instead of accepting a callback

Promise.all([A,B,C]).then(D);
于 2011-01-08T01:54:29.867 回答
10

我相信现在“async”模块提供了这种并行功能,与上面的fork功能大致相同。

于 2011-09-24T05:23:11.073 回答
5

futures模块有一个我喜欢使用的名为join的子模块:

将异步调用连接在一起,类似于pthread_join线程的工作方式。

自述文件展示了一些使用它自由风格或使用Promise 模式使用未来子模块的好例子。文档中的示例:

var Join = require('join')
  , join = Join()
  , callbackA = join.add()
  , callbackB = join.add()
  , callbackC = join.add();

function abcComplete(aArgs, bArgs, cArgs) {
  console.log(aArgs[1] + bArgs[1] + cArgs[1]);
}

setTimeout(function () {
  callbackA(null, 'Hello');
}, 300);

setTimeout(function () {
  callbackB(null, 'World');
}, 500);

setTimeout(function () {
  callbackC(null, '!');
}, 400);

// this must be called after all 
join.when(abcComplete);
于 2012-02-20T04:47:09.433 回答
2

这里可能有一个简单的解决方案:http: //howtonode.org/control-flow-part-ii滚动到并行操作。另一种方法是让 A、B 和 C 都共享相同的回调函数,让该函数具有全局或至少是函数外的增量器,如果所有三个都调用了回调,则让它运行 D,当然,您还必须将 A、B 和 C 的结果存储在某处。

于 2011-01-08T01:41:15.150 回答
2

另一个选项可能是 Node 的 Step 模块:https ://github.com/creationix/step

于 2011-04-30T00:27:38.320 回答
0

你可能想试试这个小库:https ://www.npmjs.com/package/parallel-io

于 2014-12-22T20:45:04.737 回答
0

除了流行的 Promise 和 async-library,还有第三种优雅的方式——使用“接线”:

var l = new Wire();

funcA(l.branch('post'));
funcB(l.branch('comments'));
funcC(l.branch('links'));

l.success(function(results) {
   // result will be object with results:
   // { post: ..., comments: ..., links: ...}
});

https://github.com/garmoshka-mo/mo-wire

于 2015-10-19T15:45:44.970 回答