我正在尝试更新一个不久前创建的使用 nodejs 的工具(我不是 JS 开发人员,所以我试图将代码拼凑在一起)并且卡在最后一个障碍。
新功能将采用 swagger .json 定义,使用'aws-sdk'
适用于 JS 的开发工具包将端点与 AWS 服务上匹配的 API 网关进行比较,然后相应地更新网关。
代码在一个小的定义文件(大约 15 个端点)上运行良好,但是一旦我给它一个更大的定义文件,我就开始收到大量TooManyRequestsException
错误。
我了解这是由于我对 API Gateway 服务的调用太快,需要延迟/暂停。这就是我卡住的地方
我试过添加;
- 每个返回的承诺的延迟()
- 在每个承诺中运行 setTimeout()
- 为 Promise.all 和 Promise.mapSeries 添加延迟
目前我的代码遍历定义中的每个端点,然后将每个承诺的响应添加到一个承诺数组:
promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath));
循环完成后,我运行以下命令:
return Promise.all(promises)
.catch((err) => {
winston.error(err);
})
我用 mapSeries 尝试过同样的方法(不走运)。
看起来 ( getMethodResponse
promise) 中的函数会立即运行,因此,无论我添加什么类型的延迟,它们都仍然只是执行。我的怀疑是我需要让 ( getMethodResponse
) 返回一个函数,然后使用 mapSeries,但我也无法让它工作。
我试过的代码:包装getMethodResponse
在这个:
return function(value){}
然后在循环之后添加这个(并且在循环内 - 没有区别):
Promise.mapSeries(function (promises) {
return 'a'();
}).then(function (results) {
console.log('result', results);
});
还尝试了许多其他建议:
请问有什么建议吗?
编辑
根据要求,一些额外的代码可以尝试查明问题。
当前使用一小组端点的代码(在 Swagger 文件中):
module.exports = (apiName, externalUrl) => {
return getSwaggerFromHttp(externalUrl)
.then((swagger) => {
let paths = swagger.paths;
let resourcePath = '';
let resourceMethod = '';
let promises = [];
_.each(paths, function (value, key) {
resourcePath = key;
_.each(value, function (value, key) {
resourceMethod = key;
let statusList = [];
_.each(value.responses, function (value, key) {
if (key >= 200 && key <= 204) {
statusList.push(key)
}
});
_.each(statusList, function (value, key) { //Only for 200-201 range
//Working with small set
promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath))
});
});
});
//Working with small set
return Promise.all(promises)
.catch((err) => {
winston.error(err);
})
})
.catch((err) => {
winston.error(err);
});
};
从那以后,我尝试添加它来代替 return Promise.all():
Promise.map(promises, function() {
// Promise.map awaits for returned promises as well.
console.log('X');
},{concurrency: 5})
.then(function() {
return console.log("y");
});
结果是这样的(每个端点都是一样的,有很多):
错误:TooManyRequestsException:请求过多 X 错误:TooManyRequestsException:请求过多 X 错误:TooManyRequestsException:请求过多
AWS SDK 在每个 Promise 中被调用 3 次,其功能是(从 getMethodResponse() 函数启动):
apigateway.getRestApisAsync()
return apigateway.getResourcesAsync(resourceParams)
apigateway.getMethodAsync(params, function (err, data) {}
典型的 AWS 开发工具包文档指出,这是进行太多连续调用(太快)时的典型行为。我过去遇到过类似的问题,只需在被调用的代码中添加 .delay(500) 即可解决;
就像是:
return apigateway.updateModelAsync(updateModelParams)
.tap(() => logger.verbose(`Updated model ${updatedModel.name}`))
.tap(() => bar.tick())
.delay(500)
编辑#2
我想以彻底的名义,包括我的整个.js
文件。
'use strict';
const AWS = require('aws-sdk');
let apigateway, lambda;
const Promise = require('bluebird');
const R = require('ramda');
const logger = require('../logger');
const config = require('../config/default');
const helpers = require('../library/helpers');
const winston = require('winston');
const request = require('request');
const _ = require('lodash');
const region = 'ap-southeast-2';
const methodLib = require('../aws/methods');
const emitter = require('../library/emitter');
emitter.on('updateRegion', (region) => {
region = region;
AWS.config.update({ region: region });
apigateway = new AWS.APIGateway({ apiVersion: '2015-07-09' });
Promise.promisifyAll(apigateway);
});
function getSwaggerFromHttp(externalUrl) {
return new Promise((resolve, reject) => {
request.get({
url: externalUrl,
header: {
"content-type": "application/json"
}
}, (err, res, body) => {
if (err) {
winston.error(err);
reject(err);
}
let result = JSON.parse(body);
resolve(result);
})
});
}
/*
Deletes a method response
*/
function deleteMethodResponse(httpMethod, resourceId, restApiId, statusCode, resourcePath) {
let methodResponseParams = {
httpMethod: httpMethod,
resourceId: resourceId,
restApiId: restApiId,
statusCode: statusCode
};
return apigateway.deleteMethodResponseAsync(methodResponseParams)
.delay(1200)
.tap(() => logger.verbose(`Method response ${statusCode} deleted for path: ${resourcePath}`))
.error((e) => {
return console.log(`Error deleting Method Response ${httpMethod} not found on resource path: ${resourcePath} (resourceId: ${resourceId})`); // an error occurred
logger.error('Error: ' + e.stack)
});
}
/*
Deletes an integration response
*/
function deleteIntegrationResponse(httpMethod, resourceId, restApiId, statusCode, resourcePath) {
let methodResponseParams = {
httpMethod: httpMethod,
resourceId: resourceId,
restApiId: restApiId,
statusCode: statusCode
};
return apigateway.deleteIntegrationResponseAsync(methodResponseParams)
.delay(1200)
.tap(() => logger.verbose(`Integration response ${statusCode} deleted for path ${resourcePath}`))
.error((e) => {
return console.log(`Error deleting Integration Response ${httpMethod} not found on resource path: ${resourcePath} (resourceId: ${resourceId})`); // an error occurred
logger.error('Error: ' + e.stack)
});
}
/*
Get Resource
*/
function getMethodResponse(httpMethod, statusCode, apiName, resourcePath) {
let params = {
httpMethod: httpMethod.toUpperCase(),
resourceId: '',
restApiId: ''
}
return getResourceDetails(apiName, resourcePath)
.error((e) => {
logger.unimportant('Error: ' + e.stack)
})
.then((result) => {
//Only run the comparrison of models if the resourceId (from the url passed in) is found within the AWS Gateway
if (result) {
params.resourceId = result.resourceId
params.restApiId = result.apiId
var awsMethodResponses = [];
try {
apigateway.getMethodAsync(params, function (err, data) {
if (err) {
if (err.statusCode == 404) {
return console.log(`Method ${params.httpMethod} not found on resource path: ${resourcePath} (resourceId: ${params.resourceId})`); // an error occurred
}
console.log(err, err.stack); // an error occurred
}
else {
if (data) {
_.each(data.methodResponses, function (value, key) {
if (key >= 200 && key <= 204) {
awsMethodResponses.push(key)
}
});
awsMethodResponses = _.pull(awsMethodResponses, statusCode); //List of items not found within the Gateway - to be removed.
_.each(awsMethodResponses, function (value, key) {
if (data.methodResponses[value].responseModels) {
var existingModel = data.methodResponses[value].responseModels['application/json']; //Check if there is currently a model attached to the resource / method about to be deleted
methodLib.updateResponseAssociation(params.httpMethod, params.resourceId, params.restApiId, statusCode, existingModel); //Associate this model to the same resource / method, under the new response status
}
deleteMethodResponse(params.httpMethod, params.resourceId, params.restApiId, value, resourcePath)
.delay(1200)
.done();
deleteIntegrationResponse(params.httpMethod, params.resourceId, params.restApiId, value, resourcePath)
.delay(1200)
.done();
})
}
}
})
.catch(err => {
console.log(`Error: ${err}`);
});
}
catch (e) {
console.log(`getMethodAsync failed, Error: ${e}`);
}
}
})
};
function getResourceDetails(apiName, resourcePath) {
let resourceExpr = new RegExp(resourcePath + '$', 'i');
let result = {
apiId: '',
resourceId: '',
path: ''
}
return helpers.apiByName(apiName, AWS.config.region)
.delay(1200)
.then(apiId => {
result.apiId = apiId;
let resourceParams = {
restApiId: apiId,
limit: config.awsGetResourceLimit,
};
return apigateway.getResourcesAsync(resourceParams)
})
.then(R.prop('items'))
.filter(R.pipe(R.prop('path'), R.test(resourceExpr)))
.tap(helpers.handleNotFound('resource'))
.then(R.head)
.then([R.prop('path'), R.prop('id')])
.then(returnedObj => {
if (returnedObj.id) {
result.path = returnedObj.path;
result.resourceId = returnedObj.id;
logger.unimportant(`ApiId: ${result.apiId} | ResourceId: ${result.resourceId} | Path: ${result.path}`);
return result;
}
})
.catch(err => {
console.log(`Error: ${err} on API: ${apiName} Resource: ${resourcePath}`);
});
};
function delay(t) {
return new Promise(function(resolve) {
setTimeout(resolve, t)
});
}
module.exports = (apiName, externalUrl) => {
return getSwaggerFromHttp(externalUrl)
.then((swagger) => {
let paths = swagger.paths;
let resourcePath = '';
let resourceMethod = '';
let promises = [];
_.each(paths, function (value, key) {
resourcePath = key;
_.each(value, function (value, key) {
resourceMethod = key;
let statusList = [];
_.each(value.responses, function (value, key) {
if (key >= 200 && key <= 204) {
statusList.push(key)
}
});
_.each(statusList, function (value, key) { //Only for 200-201 range
promises.push(getMethodResponse(resourceMethod, value, apiName, resourcePath))
});
});
});
//Working with small set
return Promise.all(promises)
.catch((err) => {
winston.error(err);
})
})
.catch((err) => {
winston.error(err);
});
};