14

我是 node.js 新手,我试图了解如何以节点喜欢的非阻塞方式组织一些逻辑。

我有一组环境 ['stage','prod'],另一组称为品牌 ['A','B','C'] 的参数和一组设备 ['phone','tablet'] .

在节点的回调驱动的世界中,我有这个:

brands.forEach( function(brand) {
    devices.forEach( function(device) {
        var tapeS = getTape('stage',brand,device); // bad example...tapeS never set
        var tapeP = getTape('prod' ,brand,device);
    })
} )
// more stuff here
function getTape(env,brand,device) {
   var req = http.request(someOptions,function(resp) {
       // ok, so we handle the response here, but how do I sequence this with all the other
       // responses, also happening asynchronously?
   });
}

我正在尝试为每个环境构建一个包含块的报告:

A:
    Stage -- report
    Prod  -- report 
B:    ...

我的问题是,因为这里的一切都是异步的,尤其是在 getTape 内部,它调用节点的 http.request。如何在所有这些异步奇迹结束时序列化所有内容,以便我可以按照我想要的顺序创建报告?

我听说了一些关于 javascript Promises 的事情。这会有所帮助吗,即以某种方式收集所有这些 Promise,然后等待它们全部完成,然后获取它们收集的数据?

4

5 回答 5

39

Q是 node.js 中主要的 Promise 实现。我也有自己的超轻量级承诺库Promise。我的库没有实现我在这些示例中使用的所有功能,但可以通过较小的调整使其工作。Promises/A+是Promises/A+的基础规范。它定义了一个.then方法的行为并且非常易读,所以一定要在某个时候看一下(不一定马上)。

Promise 背后的想法是它们封装了一个异步值。这使得推理如何将同步代码转换为异步代码变得更容易,因为通常有很好的并行性。作为对这些概念的介绍,我推荐我关于Promises 和 Generators的演讲或 Domenic Denicola 的演讲之一(例如Promises、PromisesCallbacks、Promises 和 Coroutines(哦,天哪!))。

首先要决定的是您是要并行发出请求,还是按顺序一次发出一个请求。从这个问题我猜你想并行地做它们。我还将假设您正在使用 Q 这意味着您必须使用以下命令安装它:

npm install q

并在您使用它的每个文件的顶部要求它:

var Q = require('q');

考虑用于打印该报告的理想数据结构,我认为您将拥有一系列品牌,以及一系列设备,这些设备将是具有属性的对象stageprod例如:

[
  {
      brand: 'A',
      devices: [
        {
          device: 'phone',
          stage: TAPE,
          prod: TAPE
        },
        {
          device: 'tablet',
          stage: TAPE,
          prod: TAPE
        }
        ...
      ]
  },
  {
      brand: 'B',
      devices: [
        {
          device: 'phone',
          stage: TAPE,
          prod: TAPE
        },
        {
          device: 'tablet',
          stage: TAPE,
          prod: TAPE
        }
        ...
      ]
  }
  ...
]

我假设如果你有这个,那么你打印出所需的报告就没有问题了。

承诺的 HTTP 请求

让我们从查看getTape函数开始。您是否希望它返回一个 node.js 流或包含整个下载文件的缓冲区/字符串?无论哪种方式,在图书馆的帮助下,您都会发现它更容易。如果您是 node.js 的新手,我建议您将request作为一个库来满足您的期望。如果你感觉更有信心,substack 的hyperquest是一个小得多的库,可以说更整洁,但它需要你手动处理重定向等事情,你可能不想进入。

流式传输(困难)

流媒体方法很棘手。如果您的磁带长度为 100 MB,则可以完成并且需要它,但承诺可能不是正确的方法。如果这是您实际遇到的问题,我很乐意更详细地研究此问题。

缓冲请求(简单)

要创建一个使用request执行缓冲 HTTP 请求并返回 Promise 的函数,这相当简单。

var Q = require('q')
var request = Q.denodeify(require('request'))

Q.denodeify只是说的捷径:“使用这个通常需要回调的函数,并给我一个接受承诺的函数”。

为此getTape,我们执行以下操作:

function getTape(env, brand, device) {
  var response = request({
    uri: 'http://example.com/' + env + '/' + brand + '/' + device,
    method: 'GET'
  })
  return response.then(function (res) {
    if (res.statusCode >= 300) {
      throw new Error('Server responded with status code ' + res.statusCode)
    } else {
      return res.body.toString() //assuming tapes are strings and not binary data
    }
  })
}

那里发生的事情是request(via Q.denodeify) 正在返回一个承诺。我们正在.then(onFulfilled, onRejected)兑现这一承诺。这将返回一个新的转换承诺。如果响应承诺被拒绝(相当于throw在同步代码中),那么转换后的承诺也是如此(因为我们没有附加onRejected处理程序)。

如果您放入其中一个处理程序,则转换后的 Promise 将被拒绝。如果您从其中一个处理程序返回一个值,则转换后的 promise 将使用该值“实现”(有时也称为“已解决”)。然后,我们可以在转换后的 Promise 结束时链接更多.then调用。

我们返回转换后的 Promise 作为我们函数的结果。

提出请求

JavaScript 有一个非常有用的函数,叫做.map. 这就像.forEach但返回一个转换后的数组。我将使用它来尽可能接近原始同步代码。

var data = brands.map(function (brand) {
  var b = {brand: brand}
  b.devices = devices.map(function (device) {
    var d = {device: device}
    d.tapeS = getTape('stage',brand,device); // bad example...tapeS never set
    d.tapeP = getTape('prod' ,brand,device);
    return d
  })
})

现在我们有了代码,它为我们提供了我一开始提出的数据结构,除了我们有Promise<TAPE>而不是TAPE.

等待请求

Q 有一个非常有用的方法,叫做Q.all. 它需要一组 Promise 并等待它们全部完成,因此让我们将我们的数据结构转换为一组 Promise 以传递给 Q.all。

一种方法是在最后,我们可以遍历每个项目并等待 Promise 解决。

var updated = Q.all(data.map(function (brand) {
  return Q.all(brand.devices.map(function (device) {
    return Q.all([device.tapeS, device.tapeP])
      .spread(function (tapeS, tapeP) {
        //update the values with the returned promises
        device.tapeS = tapeS
        device.tapeP = tapeP
      })
  })
}))

//if you add a line that reads `updated = updated.thenResolve(data)`,
//updated would become a promise for the data structure (after being resolved)

updated.then(function () {
  // `data` structure now has no promises in it and is ready to be printed
})

另一种方法是边做边做,以便“提出请求”代码被替换为:

var data = Q.all(brands.map(function (brand) {
  var b = {brand: brand}
  Q.all(devices.map(function (device) {
    var d = {device: device}
    var tapeSPromise = getTape('stage',brand,device);
    var tapePPromise = getTape('prod' ,brand,device);
    return Q.all([tapeSPromise, tapePPromise])
      .spread(function (tapeS, tapeP) { //now these are the actual tapes
        d.tapeS = tapeS
        d.tapeP = tapeP
        return d
      })
  }))
  .then(function (devices) {
    b.devices = devices
    return b
  })
}))

data.then(function (data) {
  // `data` structure now has no promises in it and is ready to be printed
})

还有一种方法是使用一个小型实用程序库来对对象进行递归深度解析。我还没准备好发布它,但是这个实用函数(从 Kriskowal 的工作中借来的)做了一个深刻的解决,它可以让你使用:

var data = deep(brands.map(function (brand) {
  var b = {brand: brand}
  b.devices = devices.map(function (device) {
    var d = {device: device}
    d.tapeS = getTape('stage',brand,device); // bad example...tapeS never set
    d.tapeP = getTape('prod' ,brand,device);
    return d
  })
}))

data.then(function (data) {
  // `data` structure now has no promises in it and is ready to be printed
})

获得最终数据的承诺。

于 2013-05-30T22:18:36.913 回答
1

我对 node.js 也很陌生,最近我发现了一些库,它们在以各种方式组织异步回调方面特别有效。然而,到目前为止,我最喜欢的是caolan 的 async。它有一些有用的模式,但我发现最有用的是 async.series、async.parallel、async.waterfall。第一个,async.series,只是按线性顺序执行异步函数:

async.series([
function(callback){
    // do some stuff ...
    callback(null, 'one');
},
function(callback){
    // do some more stuff ...
    callback(null, 'two');
}
],
// optional callback
function(err, results){
    // results is now equal to ['one', 'two']
});

第二个,async.parallel,简单地同时执行函数:

async.parallel([
function(callback){
    setTimeout(function(){
        callback(null, 'one');
    }, 200);
},
function(callback){
    setTimeout(function(){
        callback(null, 'two');
    }, 100);
}
],
// optional callback
function(err, results){
    // the results array will equal ['one','two'] even though
    // the second function had a shorter timeout.
});

最后一个,也是我最喜欢的,和前面提到的 async.series 一样,不过也是把上一个函数的结果传递给下一个:

async.waterfall([
function(callback){
    callback(null, 'one', 'two');
},
function(arg1, arg2, callback){
    callback(null, 'three');
},
function(arg1, callback){
    // arg1 now equals 'three'
    callback(null, 'done');
}
], function (err, result) {
    // result now equals 'done'    
});

嗯,这是我的作品。在我看来,这只是格式化节点疯狂的非阻塞架构的最简单方法。如果您需要更多帮助,请给我发送 PM。我知道随着更大、更复杂的代码库,node.js 会变得多么令人生畏。

干杯。

于 2013-05-30T21:10:21.233 回答
0

作为初学者,您现在可能希望继续使用回调和简单的流控制库。在你很好地掌握了回调和延续传递风格之后,再看看 Promise。

这是使用队列库的简单方法,例如:

var queue = require('queue-async')

var q = queue()

brands.forEach(function(brand){
    brand.devices.forEach(function(device){
        q.defer(getTape.bind(null, 'stage', brand, device))
        q.defer(getTape.bind(null, 'prod', brand, device))
    })
})

q.awaitAll(function(error, results){
    // use result pairs here
    console.log(results)
})
于 2013-06-14T03:50:52.260 回答
0

Promise 的另一种选择是使用该async模块:

async.map(brands, function(brand, brand_cb) {
    async.map(brand.devices, function(device, device_cb) {
        async.parallel({
            stage: function(cb) {
                // ...
                cb(null, stage_data)
            },
            prod: function(cb) {
                // ...
                cb(null, prod_data)
            }
        }, function(err, data) {
            device_cb(null, {name: device, data: data});
        });
    }, function(err, data) {
        brand_cb(null, {name: brand, devices: data});
    });
}, function(err, all_the_results) {
    console.log(all_the_results[0].devices[0].data.prod;
});
于 2013-05-30T21:25:25.367 回答
0

如果你对使用 Promise 感兴趣,可以看看我的Faithful库。它模仿了许多功能的 Async API,并且还具有您简要提到的“收集”功能。

请注意,到目前为止,faith.parallel 只接受一个数组,而不是一个哈希。那还有待实施。

于 2013-05-30T21:57:27.910 回答