根据我的理解,它不会一次只打开最多 user_words.length 可能的连接,因为这些是在 Promise.all 等待解决时正在完成的请求?我看不出连接是如何关闭的。
不,这是不正确的。您有两个嵌套循环,因此您可以同时打开for
多达user_words.length
*的所有循环。how many subreddits there are
请记住,rp()
不要Promise.all()
阻塞,因此您可以在for
处理任何响应之前运行嵌套循环以完成启动每个连接。
看起来您还期望以某种方式与代码行同步返回结果return top_subreddit
。你也不能那样做。您应该返回一个最终将解决为所需结果的承诺。
根据我的理解,它不会一次只打开最多 user_words.length 可能的连接,因为这些是在 Promise.all 等待解决时正在完成的请求?我看不出连接是如何关闭的。
这不是正确的理解Promise.all()
。 Promise.all()
没有阻塞。在您的代码继续退出之前,它不会“等待”所有承诺都得到解决。它的行为是异步的。您的代码继续执行循环的其他迭代,for
并在您传递给它的所有承诺完成后的某个时间Promise.all()
调用它的处理程序。.then()
循环的其他迭代for
继续运行并堆积更多套接字。
我认为解决此问题的最简单方法是创建一个您想要处理的 URL 数组,然后使用已经具有内置函数的异步库之一,以允许您在运行中运行最多 N 个异步操作同时。由于您的代码是基于承诺的,我会选择 BluebirdPromise.map()
来处理 URL 列表。
var Promise = require('bluebird');
function getBestSubreddit(messageText) {
var user_words = parse_message(messageText);
var top_subreddit = "";
var top_score = Number.MIN_SAFE_INTEGER;
return rp(dbUrl + '/.json?shallow=true').then(function(res) {
res = JSON.parse(res);
// build a list of URLs to process
var urls = [];
for (var subreddit in res) {
if (res.hasOwnProperty(subreddit)) {
for (var i = 0; i < user_words.length; i++) {
urls.push(dbUrl + '/' + subreddit + '/word_freqs/' + user_words[i] + '.json');
}
}
}
//
return Promise.map(urls, function(url) {
return rp(url);
}, {concurrency: 20}).then(function(allResults) {
// do any final processing of allResults here and return that value
// to become the resolved result of the returned promise
});
}
}
getBestSubreddit(someText).then(function(result) {
// process result here
}).catch(function(err) {
// handle error here
});
在此示例中,我将并发请求数设置为 20。您可以试验将其更改为更高或更低的数字是否会提高您的吞吐量。理想的数字取决于许多因素,包括您的本地执行环境、您请求的数据量、您拥有的带宽以及您从其发出请求的目标主机以及它如何处理同时请求。如果您过快地发出太多请求,您可能还需要关注目标的速率限制。
其他一些相关答案:
如何从 nodejs 应用程序发出数百万个并行 http 请求?
在节点 js 中。我可以使用“请求”包同时发送多少个请求
发出一百万个请求
从你的问题中我仍然不清楚你想要得到什么结果,但这是一个收集所有可能数据的版本。你最终会得到一个这种形式的对象数组,{result: result, subreddit: subreddit, word: word}
其中result
是rp()
给定 subreddit 和给定单词的结果。然后,您可以根据需要整理该组结果:
var Promise = require('bluebird');
function getBestSubreddit(messageText) {
var user_words = parse_message(messageText);
var top_subreddit = "";
var top_score = Number.MIN_SAFE_INTEGER;
return rp(dbUrl + '/.json?shallow=true').then(function(res) {
res = JSON.parse(res);
// build a list of URLs to process
var requestData = [];
for (var subreddit in res) {
if (res.hasOwnProperty(subreddit)) {
for (var i = 0; i < user_words.length; i++) {
requestData.push({url:dbUrl + '/' + subreddit + '/word_freqs/' + user_words[i] + '.json', subreddit: subreddit, word: user_words[i]});
}
}
}
//
return Promise.map(requestData, function(url) {
return rp(requestData.url).then(function(result) {
return {result: result, subreddit: requestData.subreddit, word: requestData.word};
});
}, {concurrency: 20}).then(function(allResults) {
// now filter through all the data with appropriate subreddit
// allResults is an array of objects of this form {result: result, subreddit: subreddit, word: word}
// return whatever you want the final result to be after processing the allResults array
});
}
}
getBestSubreddit(someText).then(function(result) {
// process result here
}).catch(function(err) {
// handle error here
});