2

我正在尝试更新一个不久前创建的使用 nodejs 的工具(我不是 JS 开发人员,所以我试图将代码拼凑在一起)并且卡在最后一个障碍。

新功能将采用 swagger .json 定义,使用'aws-sdk'适用于 JS 的开发工具包将端点与 AWS 服务上匹配的 API 网关进行比较,然后相应地更新网关。

代码在一个小的定义文件(大约 15 个端点)上运行良好,但是一旦我给它一个更大的定义文件,我就开始收到大量TooManyRequestsException错误。

我了解这是由于我对 API Gateway 服务的调用太快,需要延迟/暂停。这就是我卡住的地方

我试过添加;

  • 每个返回的承诺的延迟()
  • 在每个承诺中运行 setTimeout()
  • 为 Promise.all 和 Promise.mapSeries 添加延迟

目前我的代码遍历定义中的每个端点,然后将每个承诺的响应添加到一个承诺数组:

promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath)); 

循环完成后,我运行以下命令:

        return Promise.all(promises)
        .catch((err) => {
            winston.error(err);
        })

我用 mapSeries 尝试过同样的方法(不走运)。

看起来 ( getMethodResponsepromise) 中的函数会立即运行,因此,无论我添加什么类型的延迟,它们都仍然只是执行。我的怀疑是我需要让 ( getMethodResponse) 返回一个函数,然后使用 mapSeries,但我也无法让它工作。

我试过的代码:包装getMethodResponse在这个:

return function(value){}

然后在循环之后添加这个(并且在循环内 - 没有区别):

 Promise.mapSeries(function (promises) {
 return 'a'();
 }).then(function (results) {
 console.log('result', results);
 });

还尝试了许多其他建议:

这里

这里

请问有什么建议吗?

编辑

根据要求,一些额外的代码可以尝试查明问题。

当前使用一小组端点的代码(在 Swagger 文件中):

module.exports = (apiName, externalUrl) => {

return getSwaggerFromHttp(externalUrl)
    .then((swagger) => {
        let paths = swagger.paths;
        let resourcePath = '';
        let resourceMethod = '';
        let promises = [];

        _.each(paths, function (value, key) {
            resourcePath = key;
            _.each(value, function (value, key) {
                resourceMethod = key;
                let statusList = [];
                _.each(value.responses, function (value, key) {
                    if (key >= 200 && key <= 204) {
                        statusList.push(key)
                    }
                });
                _.each(statusList, function (value, key) { //Only for 200-201 range  

                    //Working with small set 
                    promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath))
                });             
            });
        });

        //Working with small set
        return Promise.all(promises)
        .catch((err) => {
            winston.error(err);
        })
    })
    .catch((err) => {
        winston.error(err);
    });

};

从那以后,我尝试添加它来代替 return Promise.all():

            Promise.map(promises, function() {
            // Promise.map awaits for returned promises as well.
            console.log('X');
        },{concurrency: 5})
        .then(function() {
            return console.log("y");
        });

结果是这样的(每个端点都是一样的,有很多):

错误:TooManyRequestsException:请求过多 X 错误:TooManyRequestsException:请求过多 X 错误:TooManyRequestsException:请求过多

AWS SDK 在每个 Promise 中被调用 3 次,其功能是(从 getMethodResponse() 函数启动):

apigateway.getRestApisAsync()
return apigateway.getResourcesAsync(resourceParams)
apigateway.getMethodAsync(params, function (err, data) {}

典型的 AWS 开发工具包文档指出,这是进行太多连续调用(太快)时的典型行为。我过去遇到过类似的问题,只需在被调用的代码中添加 .delay(500) 即可解决;

就像是:

    return apigateway.updateModelAsync(updateModelParams)
    .tap(() => logger.verbose(`Updated model ${updatedModel.name}`))
    .tap(() => bar.tick())
    .delay(500)

编辑#2

我想以彻底的名义,包括我的整个.js文件。

'use strict';

const AWS = require('aws-sdk');
let apigateway, lambda;
const Promise = require('bluebird');
const R = require('ramda');
const logger = require('../logger');
const config = require('../config/default');
const helpers = require('../library/helpers');
const winston = require('winston');
const request = require('request');
const _ = require('lodash');
const region = 'ap-southeast-2';
const methodLib = require('../aws/methods');

const emitter = require('../library/emitter');
emitter.on('updateRegion', (region) => {
    region = region;
    AWS.config.update({ region: region });
    apigateway = new AWS.APIGateway({ apiVersion: '2015-07-09' });
    Promise.promisifyAll(apigateway);
});

function getSwaggerFromHttp(externalUrl) {
    return new Promise((resolve, reject) => {
        request.get({
            url: externalUrl,
            header: {
                "content-type": "application/json"
            }
        }, (err, res, body) => {
            if (err) {
                winston.error(err);
                reject(err);
            }

            let result = JSON.parse(body);
            resolve(result);
        })
    });
}

/*
    Deletes a method response
*/
function deleteMethodResponse(httpMethod, resourceId, restApiId, statusCode, resourcePath) {

    let methodResponseParams = {
        httpMethod: httpMethod,
        resourceId: resourceId,
        restApiId: restApiId,
        statusCode: statusCode
    };

    return apigateway.deleteMethodResponseAsync(methodResponseParams)
        .delay(1200)
        .tap(() => logger.verbose(`Method response ${statusCode} deleted for path: ${resourcePath}`))
        .error((e) => {
            return console.log(`Error deleting Method Response ${httpMethod} not found on resource path: ${resourcePath} (resourceId: ${resourceId})`); // an error occurred
            logger.error('Error: ' + e.stack)
        });
}

/*
    Deletes an integration response
*/
function deleteIntegrationResponse(httpMethod, resourceId, restApiId, statusCode, resourcePath) {

    let methodResponseParams = {
        httpMethod: httpMethod,
        resourceId: resourceId,
        restApiId: restApiId,
        statusCode: statusCode
    };

    return apigateway.deleteIntegrationResponseAsync(methodResponseParams)
        .delay(1200)
        .tap(() => logger.verbose(`Integration response ${statusCode} deleted for path ${resourcePath}`))
        .error((e) => {
            return console.log(`Error deleting Integration Response ${httpMethod} not found on resource path: ${resourcePath} (resourceId: ${resourceId})`); // an error occurred
            logger.error('Error: ' + e.stack)
        });
}

/*
    Get Resource
*/
function getMethodResponse(httpMethod, statusCode, apiName, resourcePath) {

    let params = {
        httpMethod: httpMethod.toUpperCase(),
        resourceId: '',
        restApiId: ''
    }

    return getResourceDetails(apiName, resourcePath)
        .error((e) => {
            logger.unimportant('Error: ' + e.stack)
        }) 
        .then((result) => {
            //Only run the comparrison of models if the resourceId (from the url passed in) is found within the AWS Gateway
            if (result) {
                params.resourceId = result.resourceId
                params.restApiId = result.apiId

                var awsMethodResponses = [];
                try {
                    apigateway.getMethodAsync(params, function (err, data) {
                        if (err) {
                            if (err.statusCode == 404) {
                                return console.log(`Method ${params.httpMethod} not found on resource path: ${resourcePath} (resourceId: ${params.resourceId})`); // an error occurred
                            }
                            console.log(err, err.stack); // an error occurred
                        }
                        else {
                            if (data) {
                                _.each(data.methodResponses, function (value, key) {
                                    if (key >= 200 && key <= 204) {
                                        awsMethodResponses.push(key)
                                    }
                                });
                                awsMethodResponses = _.pull(awsMethodResponses, statusCode); //List of items not found within the Gateway - to be removed.
                                _.each(awsMethodResponses, function (value, key) {
                                    if (data.methodResponses[value].responseModels) {
                                        var existingModel = data.methodResponses[value].responseModels['application/json']; //Check if there is currently a model attached to the resource / method about to be deleted
                                        methodLib.updateResponseAssociation(params.httpMethod, params.resourceId, params.restApiId, statusCode, existingModel); //Associate this model to the same resource / method, under the new response status
                                    }
                                    deleteMethodResponse(params.httpMethod, params.resourceId, params.restApiId, value, resourcePath)
                                        .delay(1200)
                                        .done();
                                    deleteIntegrationResponse(params.httpMethod, params.resourceId, params.restApiId, value, resourcePath)
                                        .delay(1200)
                                        .done();
                                })
                            }
                        }
                    })
                        .catch(err => {
                            console.log(`Error: ${err}`);
                        });
                }
                catch (e) {
                    console.log(`getMethodAsync failed, Error: ${e}`);
                }
            }
        })
};

function getResourceDetails(apiName, resourcePath) {

    let resourceExpr = new RegExp(resourcePath + '$', 'i');

    let result = {
        apiId: '',
        resourceId: '',
        path: ''
    }

    return helpers.apiByName(apiName, AWS.config.region)
        .delay(1200)
        .then(apiId => {
            result.apiId = apiId;

            let resourceParams = {
                restApiId: apiId,
                limit: config.awsGetResourceLimit,
            };

            return apigateway.getResourcesAsync(resourceParams)

        })
        .then(R.prop('items'))
        .filter(R.pipe(R.prop('path'), R.test(resourceExpr)))
        .tap(helpers.handleNotFound('resource'))
        .then(R.head)
        .then([R.prop('path'), R.prop('id')])
        .then(returnedObj => {
            if (returnedObj.id) {
                result.path = returnedObj.path;
                result.resourceId = returnedObj.id;
                logger.unimportant(`ApiId: ${result.apiId} | ResourceId: ${result.resourceId} | Path: ${result.path}`);
                return result;
            }
        })
        .catch(err => {
            console.log(`Error: ${err} on API: ${apiName} Resource: ${resourcePath}`);
        });
};

function delay(t) {
    return new Promise(function(resolve) { 
        setTimeout(resolve, t)
    });
 }

module.exports = (apiName, externalUrl) => {

    return getSwaggerFromHttp(externalUrl)
        .then((swagger) => {
            let paths = swagger.paths;
            let resourcePath = '';
            let resourceMethod = '';
            let promises = [];

            _.each(paths, function (value, key) {
                resourcePath = key;
                _.each(value, function (value, key) {
                    resourceMethod = key;
                    let statusList = [];
                    _.each(value.responses, function (value, key) {
                        if (key >= 200 && key <= 204) {
                            statusList.push(key)
                        }
                    });
                    _.each(statusList, function (value, key) { //Only for 200-201 range  

                        promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath))

                    });             
                });
            });

            //Working with small set
            return Promise.all(promises)
            .catch((err) => {
                winston.error(err);
            })
        })
        .catch((err) => {
            winston.error(err);
        });
};
4

1 回答 1

3

Promise.all()你显然对做什么和做什么有误解Promise.map()

Promise.all()所做的只是跟踪一整套 Promise,以告诉您它们所代表的异步操作何时全部完成(或返回错误)。当您向它传递一系列承诺时(正如您所做的那样),所有这些异步操作都已经并行启动。因此,如果您试图限制同时运行的异步操作的数量,那么此时已经为时已晚。因此,Promise.all()它本身并不能帮助您以任何方式控制同时运行的数量。

从那以后我也注意到,这条线似乎promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath))实际上是在执行承诺,而不是简单地将它们添加到数组中。似乎最后一个Promise.all()实际上并没有多大作用。

是的,当您执行时promises.push(getMethodResponse()),您会立即调用getMethodResponse()。这会立即启动异步操作。然后该函数返回一个承诺,Promise.all()并将监视该承诺(以及您放入数组中的所有其他承诺)以告诉您何时完成。就是Promise.all()这样。它监控您已经开始的操作。要将同时进行的最大请求数保持在某个阈值以下,您必须不要像您正在做的那样一次启动所有异步操作。 Promise.all()不会为您这样做。


为了让 BluebirdPromise.map()完全帮助您,您必须向它传递一个 DATA 数组,而不是 promise。当你向它传递一个代表你已经开始的异步操作的 Promise 数组时,它只能做的Promise.all()事情。但是,如果您将一个数据数组和一个回调函数传递给它,然后该函数可以为数组中的每个数据元素启动异步操作,那么它可以在您使用该concurrency选项时为您提供帮助。

您的代码非常复杂,因此我将使用一个简单的网络爬虫来说明,它想要读取大量 URL,但出于内存考虑,一次只能处理 20 个。

const rp = require('request-promise');
let urls = [...];    // large array of URLs to process

Promise.map(urls, function(url) {
    return rp(url).then(function(data) {
        // process scraped data here
        return someValue;
    });
}, {concurrency: 20}).then(function(results) {
   // process array of results here
}).catch(function(err) {
    // error here
});

在这个例子中,希望你能看到一个数据项数组被传入Promise.map()(而不是一个 Promise 数组)。然后,它允许Promise.map()管理处理数组的方式/时间,在这种情况下,它将使用该concurrency: 20设置来确保同时进行的请求不超过 20 个。


您使用的努力Promise.map()是传递一系列承诺,这对您没有帮助,因为承诺代表已经开始的异步操作:

Promise.map(promises, function() {
    ...
});

然后,此外,您确实需要通过阅读显示此错误的目标 API 上的文档或通过进行大量测试来找出导致错误的确切原因,TooManyRequestsException因为可能有多种原因可能导致此错误并且不知道正是您需要控制的内容,只需进行大量疯狂的猜测即可尝试找出可行的方法。API 可能检测到的最常见的事情是:

  1. 来自同一帐户或来源的同时请求。
  2. 来自同一帐户或来源的每单位时间的请求数(例如每秒请求数)。

中的concurrency操作Promise.map()将很容易地帮助您使用第一个选项,但不一定会帮助您使用第二个选项,因为您可以限制同时请求的数量很少,并且仍然超过每秒请求数限制。第二个需要一些实际的时间控制。插入delay()语句有时会起作用,但即使这样也不是一种非常直接的管理方法,并且会导致控制不一致(有时有效,但有时无效)或次优控制(将自己限制在远低于您的控制范围内)可以实际使用)。

要管理每秒请求的限制,您需要使用速率限制库或您自己的代码中的实际速率限制逻辑进行一些实际的时间控制。

以下是限制每秒请求数的方案示例:如何管理请求以保持低于速率限制

于 2017-11-20T14:47:17.993 回答