5

我的程序正在与每秒仅接受约 10 个请求的 Web 服务进行通信。有时,我的程序会向 Web 服务发送 100 多个并发请求,导致我的程序崩溃。

如何将 Node.js 中的并发请求限制为每秒 5 个?我正在使用请求库。

 // IF EVENT AND SENDER
    if(data.sender[0].events && data.sender[0].events.length > 0) {


        // FIND ALL EVENTS
        for(var i = 0; i < data.sender[0].events.length; i++) {

            // IF TYPE IS "ADDED"
            if(data.sender[0].events[i].type == "added") {
                switch (data.sender[0].events[i].link.rel) {
                    case "contact" :
                        batch("added", data.sender[0].events[i].link.href);
                        //_initContacts(data.sender[0].events[i].link.href);
                        break;
                } 
            // IF TYPE IS "UPDATED"
            } else if(data.sender[0].events[i].type == "updated") {

                switch (data.sender[0].events[i].link.rel){                     
                    case "contactPresence" :
                        batch("updated", data.sender[0].events[i].link.href);
                        //_getContactPresence(data.sender[0].events[i].link.href);
                        break;
                    case "contactNote" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactNote(data.sender[0].events[i].link.href);
                        break;
                    case "contactLocation" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactLocation(data.sender[0].events[i].link.href);
                        break;
                    case "presenceSubscription" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _extendPresenceSubscription(data.sender[0].events[i].link.href);
                        break;
                }
            }
        };

然后是自制的批处理方法:

var updated = [];
var added = [];

var batch = function(type, url){
    console.log("batch called");


    if (type === "added"){
        console.log("Added batched");
        added.push(url);
        if (added.length > 5) {
            setTimeout(added.forEach(function(req){
                _initContacts(req);
            }), 2000);
            added = [];
        }
    } 
    else if (type === "updated"){
        console.log("Updated batched");
        updated.push(url);
        console.log("Updated length is : ", updated.length);
        if (updated.length > 5){
            console.log("Over 5 updated events");
            updated.forEach(function(req){
                setTimeout(_getContactLocation(req), 2000);
            });
            updated = [];
        }
    }       
};

以及实际请求的示例:

var _getContactLocation = function(url){
    r.get(baseUrl + url, 
    { "strictSSL" : false, "headers" : { "Authorization" : "Bearer " + accessToken }}, 
        function(err, res, body){
            if(err)
                console.log(err);
            else {
                var data = JSON.parse(body);
                self.emit("data.contact", data);
            }
        }
    );
};
4

2 回答 2

11

使用异步库,该mapLimit函数完全符合您的要求。由于您没有提供任何代码,因此我无法为您的特定用例提供示例。

从自述文件:


mapLimit(arr,限制,迭代器,回调)

与 map 相同,只有不超过“限制”的迭代器将随时同时运行。

请注意,这些项目不是批量处理的,因此不能保证第一个“限制”迭代器函数将在任何其他迭代器函数启动之前完成。

论据

  • arr - 要迭代的数组。
  • limit - 任何时候运行的最大迭代器数。
  • iterator(item, callback) - 应用于数组中每个项目的函数。迭代器被传递一个回调(err,transformed),一旦它完成一个错误(可以为空)和一个转换的项目,就必须调用它。
  • callback(err, results) - 在所有迭代器函数完成或发生错误后调用的回调。结果是来自原始数组的转换项的数组。

例子

async.mapLimit(['file1','file2','file3'], 1, fs.stat, function(err, results){ // results is now an array of stats for each file });


编辑:既然您提供了代码,我发现您的使用与我假设的有点不同。当您知道要预先运行的所有任务时,该async库会更有用。我不知道有一个图书馆可以轻松为您解决这个问题。上面的注释可能仍然与搜索此主题的人相关,因此我将其保留。

抱歉,我没有时间重构您的代码,但这是一个(未经测试)的函数示例,该函数发出异步请求,同时自我限制为每秒 5 个请求。我强烈建议解决此问题,以提出适合您的代码库的更通用的解决方案。

var throttledRequest = (function () {
    var queue = [], running = 0;

    function sendPossibleRequests() {
        var url;
        while (queue.length > 0 && running < 5) {
            url = queue.shift();
            running++;
            r.get(url, { /* YOUR OPTIONS HERE*/ }, function (err, res, body) {
                running--;
                sendPossibleRequests();

                if(err)
                    console.log(err);
                else {
                    var data = JSON.parse(body);
                    self.emit("data.contact", data);
                }
            });
        }
    }

    return function (url) {
        queue.push(url);
        sendPossibleRequests();
    };
})();

基本上,您保留所有要异步处理的数据的队列(例如要请求的 url),然后在每次回调(来自请求)之后,您尝试启动尽可能多的剩余请求。

于 2013-08-19T17:52:57.333 回答
6

这正是节点的Agent类旨在解决的问题。您是否做过一些愚蠢的事情或作为请求选项require('http').globalAgent.maxSockets = Number.MAX_VALUE传递?agent: false

使用 Node 的默认行为,您的程序一次不会发送超过 5 个并发请求。此外,Agent 提供了简单队列无法实现的优化(即 HTTP keepalives)。

如果您尝试发出许多请求(例如,从一个循环发出 100 个请求),前 5 个请求将开始,代理将排队剩余的 95 个。请求完成后,它将开始下一个请求。

您可能想要做的是Agent为您的 Web 服务请求创建一个,并将其传递给每个请求调用(而不是将请求与全局代理混合)。

var http=require('http'), svcAgent = http.Agent();

request({ ... , agent: svcAgent });
于 2013-08-19T19:43:14.090 回答