简短的回答是:
- 评估结果需要通过调用 config:Put_Evaluations() 而不是实际的 lambda 返回来报告。
- lambda 返回应该只是评估列表。
长答案是,这是我的有效解决方案:
用于配置规则的 AWS Lambda 函数(语言 python3.8)
'''
#####################################
## Gherkin ##
#####################################
Rule Name:
security-group-of-rds
Description:
Checks that all Oracle databases are using the correct security group and only that group.
Trigger:
Configuration Change on AWS::RDS::DbInstance . Scope of changes == Resources.
Reports on:
AWS::RDS::DbInstance
Parameters:
| ----------------------|-----------|-----------------------------------------------|
| Parameter Name | Type | Description |
| ----------------------|-----------|---------------------------------------------- |
| vpcSecurityGroupId | string | Id of the required vpc Security Group. |
| ----------------------|-----------|---------------------------------------------- |
| Assume-Rule-Role | boolean | If true, switch to the config role. |
| | | Defaults to false. |
|-----------------------|-----------|-----------------------------------------------|
| Mode | Enum | Range: Fully-Operational-DeathStar | |
| | | Put-Evaluations-Test | |
| | | Lambda-Console-Test |
| | | Defaults to Fully-Operational-DeathStar . |
| | | Meanings: |
| | | Fully-Operational-DeathStar: |
| | | Normal operation. |
| | | Put-Evaluations-Test: Set TestMode to True, |
| | | when invoking put_evaluations. |
| | | Refer: https://docs.aws.amazon.com/config/latest/APIReference/API_PutEvaluations.html
| | | Lambda-Console-Test: |
| | | Do not call put_evaluations() at all. | |
|-----------------------|-----------|-----------------------------------------------|
Envars:
| ----------------------|-----------|-----------------------------------------------|
| Envar Name | Type | Description |
| ----------------------|-----------|---------------------------------------------- |
| PROXY | string | http(s) proxy. Default to no proxy. |
|-----------------------|-----------|-----------------------------------------------|
| NO_PROXY | comma- | list of exemptions to proxy. |
| | separated-| Defaults to no exemptions |
| | list | |
|-----------------------|-----------|-----------------------------------------------|
| TURN_OFF_SSL | boolean | Turns of SSL verification. Defaults to False |
|-----------------------|-----------|-----------------------------------------------|
| REGION | string | Region for config service. |
| | | Defaults to the lambda region |
|-----------------------|-----------|-----------------------------------------------|
| CONFIG_ENDPOINT | string | Customised end-point for config service |
| | | Defaults to the standard end-point. |
|-----------------------|-----------|-----------------------------------------------|
Feature:
In order to: to protect the data confidentiality for Oracle oracle-ee RDS databases.
As: a Developer
I want: To ensure that all databases have the correct security group attached.
Scenarios:
Scenario 1:
Given: Wrong security group
And: The group is inactive
Then: No conclusion.
Scenario 2:
Given: Wrong security group
And: The group is active
And: type == oracle-ee
Then: return NON_COMPLIANT
Scenario 3:
Given: Right security group
And: The group is active
And: type == oracle-ee
Then: return COMPLIANT
Scenario 4:
Given: No security group
And: type == oracle-ee
Then: return NON_COMPLIANT
Scenario 5:
Given: type != oracle-ee
Then: return NOT_APPLICABLE
Required Role Policy Statements:
If you are not assuming the config rule role, then the lambda role needs all these
actions, except sts:AssumeRole.
If you ARE assuming the config rule role, then the lambda role needs the logs and sts
actions, and the config rule role needs the logs and config actions.
| ----------------------|-------------|-----------------------------------------------|
| Action | Resource | Condition | Why do we need it? |
| ----------------------|-------------|---------------------------------------------- |
| logs:CreateLogGroup | * | Always | For logging. |
| logs:CreateLogStream | | | |
| logs:PutLogEvents | | | |
| ----------------------|-------------|------------|----------------------------------|
| sts:AssumeRole | Your AWS | if Assume-Rule-Role == True | If you want the |
| | config role | | lambda to execute in the main |
| | | | config role. |
| ----------------------|-------------|------------|----------------------------------|
| config:PutEvaluations | * | Always | To put the actual results. |
| ----------------------|-------------|------------|----------------------------------|
Inline Constants Configuration:
| ----------------------|-----------|-----------------------------------------------|
| Identifier | Type | Description |
| ----------------------|-----------|---------------------------------------------- |
| defaultRegion | string | Default region, if we can't get it from the |
| | | Lambda environment. |
| ----------------------|-----------|---------------------------------------------- |
'''
import json
import datetime
import time
import boto3
import botocore
import os
proxy = None
no_proxy = None
configClient = None
defaultRegion = 'ap-southeast-2'
def setEnvar( name, value):
if os.environ.get( name, '') != value:
if value != '':
os.environ[ name] = value
else:
del os.environ[ name]
def setProxyEnvironment():
# Sometimes lamdba's sit in VPC's which require proxy forwards
# in order to access some or all internet services.
global proxy
global noProxy
proxy = os.environ.get( 'PROXY' , None)
noProxy = os.environ.get( 'NO_PROXY', None)
if proxy is not None:
setEnvar( 'http_proxy' , proxy )
setEnvar( 'https_proxy', proxy )
if noProxy is not None:
setEnvar( 'no_proxy' , noProxy)
def jpath( dict1, path, sep = '.', default = None):
# Traverse a hierarchy of dictionaries, as described by a path, and find a value.
ret = dict1
if isinstance( path, str):
particleList = path.split( sep)
else:
particleList = path
for particle in particleList:
if isinstance( ret, dict):
ret = ret.get( particle, None)
elif (isinstance( ret, list) or isinstance( ret, tuple)) and particle.isdigit():
idx = int( particle)
if (idx >= 0) and (idx < len(ret)):
ret = ret[ idx]
else:
ret = None
else:
ret = None
if ret is None:
break
if ret is None:
ret = default
return ret
def coerceToList( val):
# Make it into a list.
if val is None:
return list()
else:
return val
def coerceToBoolean( val):
if isinstance( val, str):
return val.lower() == 'true'
else:
return bool( val)
def get_region():
# Find the region for AWS services.
return os.environ.get( 'REGION', os.environ.get( 'AWS_REGION', defaultRegion))
def get_assume_role_credentials( role_arn):
# Switch to a role. We need sts:AssumeRole for this.
global proxy
if coerceToBoolean( os.environ.get( 'TURN_OFF_SSL', False)):
sts_client = boto3.client('sts', verify=False)
else:
sts_client = boto3.client('sts')
try:
assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution")
print( 'Switched role to ' + role_arn)
return assume_role_response['Credentials']
except botocore.exceptions.ClientError as ex:
# Scrub error message for any internal account info leaks
if 'AccessDenied' in ex.response['Error']['Code']:
ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role."
else:
ex.response['Error']['Message'] = "InternalError"
ex.response['Error']['Code'] = "InternalError"
print(str(ex))
raise ex
def get_client(service, event):
# Get the AWS service client for the specified service.
# If specified, switch roles and go through a custom service end-point.
global proxy
region = get_region()
ruleRole = jpath( event, 'executionRoleArn')
doAssumeRuleRole = coerceToBoolean( jpath( event, 'ruleParameters-parsed.Assume-Rule-Role', '.', False)) and (ruleRole is not None)
parms = {}
if coerceToBoolean( os.environ.get( 'TURN_OFF_SSL', False)):
parms['verify'] = False
if region is not None:
parms['region_name'] = region
if doAssumeRuleRole:
credentials = get_assume_role_credentials( ruleRole)
parms['aws_access_key_id' ] = credentials['AccessKeyId' ]
parms['aws_secret_access_key'] = credentials['SecretAccessKey']
parms['aws_session_token' ] = credentials['SessionToken' ]
endPointEnvarName = service.upper() + '_ENDPOINT'
endPointEnvarValue = os.environ.get( endPointEnvarName, '')
if endPointEnvarValue != '':
parms['endpoint_url'] = endPointEnvarValue
return boto3.client(service, **parms)
def get_configClient( event):
# Get the AWS 'config' service, and store it in a global singleton.
global configClient
if configClient is None:
configClient = get_client( 'config', event)
return configClient
def initiate_Globals():
# Mainly setup the proxy forward, if required.
configClient = None
setProxyEnvironment()
def evaluate_compliance( configuration_item, ruleParameters):
# Evaluate the compliance of the given changed resource.
# Return a dictionary in the standard 'evaluation' schema.
referenceVpcSecurityGroupId = ruleParameters.get('vpcSecurityGroupId','')
annotation = 'Ok'
if ((jpath( configuration_item, 'configuration.engine') == 'oracle-ee') and
(configuration_item.get('resourceType','') == 'AWS::RDS::DBInstance')):
ok = False
for vpcSecurityGroup in coerceToList( jpath( configuration_item, 'configuration.vpcSecurityGroups')):
actualId = vpcSecurityGroup.get('vpcSecurityGroupId','')
ok = ((actualId == referenceVpcSecurityGroupId) or
(vpcSecurityGroup.get('status','inactive') != 'active'))
if not ok:
# The security group was active, but was not equal to the prescribed one.
annotation = 'Wrong security group'
break
if ok:
# All active security groups, and at least one, are the prescribed one.
compliance_type = 'COMPLIANT'
else:
if referenceVpcSecurityGroupId == '':
annotation = 'Malformed rule parameter configuration'
if annotation == 'Ok':
annotation = 'No security groups'
compliance_type = 'NON_COMPLIANT'
else:
# This rule only deals with oracle-ee RDS databases.
compliance_type = 'NOT_APPLICABLE'
evaluation = dict()
evaluation['ComplianceResourceType'] = configuration_item['resourceType']
evaluation['ComplianceResourceId' ] = configuration_item['resourceId']
evaluation['OrderingTimestamp' ] = configuration_item['configurationItemCaptureTime']
evaluation['ComplianceType' ] = compliance_type
evaluation['Annotation' ] = annotation
return evaluation
def printEnvars( envarList):
for envarName in envarList.split(','):
envarValue = os.environ.get( envarName, None)
if envarValue is not None:
print( f'Envar {envarName} == {envarValue}')
def lambda_handler(event, context):
global configClient
# Phase 1: Setup and parsing input.
# Uncomment this when debugging:
# print( 'event == ' + json.dumps( event))
printEnvars( 'PROXY,NO_PROXY,TURN_OFF_SSL,REGION,CONFIG_ENDPOINT')
initiate_Globals()
invokingEvent = json.loads( event.get('invokingEvent','{}'))
event['invokingEvent-parsed'] = invokingEvent
ruleParameters = json.loads( event.get('ruleParameters','{}'))
event['ruleParameters-parsed'] = ruleParameters
print( 'Config rule Arn == ' + event.get( 'configRuleArn', ''))
print( 'Rule parameters == ' + json.dumps( ruleParameters))
get_configClient( event)
configuration_item = invokingEvent['configurationItem']
# Phase 2: Evaluation.
evaluation = evaluate_compliance( configuration_item, ruleParameters)
# Phase 3: Reporting.
evaluations = list()
evaluations.append( evaluation)
mode = ruleParameters.get( 'Mode', 'Fully-Operational-DeathStar')
if mode == 'Fully-Operational-DeathStar':
response = configClient.put_evaluations( Evaluations=evaluations, ResultToken=event['resultToken'])
elif mode == 'Put-Evaluations-Test':
response = configClient.put_evaluations( Evaluations=evaluations, ResultToken=event['resultToken'], TestMode=True)
else:
response = {'mode': mode}
# Uncomment this when debugging:
# print( 'response == ' + json.dumps( response))
print( 'evaluations == ' + json.dumps( evaluations))
return evaluations