import googleapiclient.discovery
from google.oauth2 import service_account
from pprint import pprint
import logging
if LOCAL == 'True':
credentials = service_account.Credentials.from_service_account_file(
'path-to-service-account.json')
compute = googleapiclient.discovery.build('compute',
'v1',
credentials=credentials)
else:
compute = googleapiclient.discovery.build('compute',
'v1')
class cloud_armor():
def __init__(self, domain):
self.domain = domain
self.project_id = <PROJECT_ID>
self.policy_name = <POLICY_NAME>
def add_rule(self):
#Find minimum current priority
current_policy = cloud_armor.get_policy(self)
current_rules = current_policy['rules']
rule_priorities = []
for rule in current_rules:
rule_priorities.append(rule['priority'])
priority = int(min(rule_priorities)) - 1
body = {
"description": "{}".format(self.domain),
"priority": priority,
"match": {
"expr": {
"expression": "request.headers['referer']=="{}"".format(
self.domain)
}
},
"action": "allow",
"preview": False,
"kind": "compute#securityPolicy"
}
try:
policies = compute.securityPolicies()
rule = policies.addRule(project=self.project_id,
securityPolicy=self.policy_name,
body=body
).execute()
return rule
except Exception as err:
for i in range(0, len(err.args)):
logging.error(err.args[i])
pprint(err.args[i])
print("==Policy Rule Failed to Add==")
raise Exception("Policy Rule Failed to Add")
def get_policy(self):
try:
policies = compute.securityPolicies()
policy = policies.get(project=self.project_id,
securityPolicy=self.policy_name
).execute()
return policy
except Exception as err:
for i in range(err.args):
logging.error(err.args[i])
pprint(err.args[i])
print("==Failed to Fetch Policy==")
raise Exception("Failed to Fetch Policy")