编辑:仅当您因任何原因无法拥有内部 API 网关时,我才推荐以下解决方案。
使用下面的代码,我可以直接成功调用另一个 Chalice Lambda(无需再次通过 API Gateway + Authorizer):
def invoke_sync(lambda_name: str,
http_method: str,
path: str,
claims: dict,
path_parameters: dict = None,
http_request_body: str = None,
query_string_parameters: dict = None,
headers: dict = None):
invoke_payload = {
'path': path,
'httpMethod': http_method,
'headers': headers,
'multiValueHeaders': {},
'queryStringParameters': query_string_parameters,
'multiValueQueryStringParameters': None,
'pathParameters': path_parameters,
'stageVariables': None,
'requestContext': {
'authorizer': {'claims': claims},
'path': path,
'resourcePath': path,
'httpMethod': http_method,
},
'body': http_request_body,
}
lambda_response = boto3.client('lambda').invoke(FunctionName=lambda_name,
InvocationType='RequestResponse',
Payload=json.dumps(invoke_payload))
payload = json.loads(lambda_response['Payload'].read())
status_code = payload['statusCode']
...
使用示例:
invoke_sync(
lambda_name='Users',
claims=<claims-in-the-id-token>,
http_method='GET',
path='/users/{userId}',
path_parameters={'userId': 123}
)
这个调用是同步的。要创建上述的异步版本,请InvocationType='Event'
按照此处记录的方式使用。
请注意,如果使用其他语言或框架,上面使用的有效负载将完全相同,因为这是从 API 网关发送到 Lambda 函数的格式。