1

我想编写一个 Python 脚本,该脚本将使用Compute Engine REST API更新或添加新规则到Google Cloud Platform中的Cloud Armor

但我有几个疑问:

  • Python的官方Google Cloud 客户端库是否适合此目的(如果不是您建议的)?

  • 为此,我应该安装哪个Python 包?

  • 标准身份验证(包含我的私钥并正确设置环境变量的JSON 文件GOOGLE_APPLICATION_CREDENTIALS)是否足以连接到此 API 以达到我想要实现的目的?

我在问路。

4

2 回答 2

1

Python 脚本将是一个不错的方法,您可以使用Google APIs Python 客户端库来实现这一点。

适用于 Python 的 Google Cloud 客户端库可能还不够,但 Google Cloud 客户端库是用于调用 Google Cloud API 的最新推荐的客户端库。

Compute Engine v1 securityPolicies REST API 资源提供了您需要的方法列表,例如添加新规则addRule、检索getRulepatchRule更新现有规则。

查看Compute Engine API 的 PyDoc 参考以获取完整的方法列表以及如何使用它们的说明。

此外,您需要使用Python 包(Google API Python 客户端库)。

您提到的标准身份验证提供了一种有用的方法,用于通过 Python 客户端库授权对 Compute Engine API 的请求。

根据官方文档:GCP 客户端库使用称为应用程序默认凭据 (ADC) 的策略来查找应用程序的凭据。当您的代码使用客户端库时,该策略会按以下顺序检查您的凭据:

  1. 首先,ADC 检查是否设置了环境变量 GOOGLE_APPLICATION_CREDENTIALS。如果设置了变量,ADC 将使用变量指向的服务帐户文件。下一节将介绍如何设置环境变量。
  2. 如果未设置环境变量,ADC 将使用 Compute Engine、Kubernetes Engine、App Engine 和 Cloud Functions 为在这些服务上运行的应用程序提供的默认服务帐号。
  3. 如果 ADC 无法使用上述任一凭据,则会发生错误。

最后,确保您选择使用的用户帐户具有在计算引擎上配置 Cloud Armor 所需的正确IAM 权限。

于 2019-06-18T22:15:57.167 回答
1

import googleapiclient.discovery
from google.oauth2 import service_account
from pprint import pprint
import logging

if LOCAL == 'True':
    credentials = service_account.Credentials.from_service_account_file(
        'path-to-service-account.json')
    compute = googleapiclient.discovery.build('compute',
                                              'v1',
                                              credentials=credentials)
else:
    compute = googleapiclient.discovery.build('compute',
                                              'v1')


class cloud_armor():

    def __init__(self, domain):
        self.domain = domain
        self.project_id = <PROJECT_ID>
        self.policy_name = <POLICY_NAME>

    def add_rule(self):
        #Find minimum current priority
        current_policy = cloud_armor.get_policy(self)
        current_rules = current_policy['rules']
        rule_priorities = []
        for rule in current_rules:
            rule_priorities.append(rule['priority'])
        priority = int(min(rule_priorities)) - 1

        body = {
            "description": "{}".format(self.domain),
            "priority": priority,
            "match": {
                "expr": {
                    "expression": "request.headers['referer']=="{}"".format(
                        self.domain)
                }
            },
            "action": "allow",
            "preview": False,
            "kind": "compute#securityPolicy"
        }
        try:
            policies = compute.securityPolicies()
            rule = policies.addRule(project=self.project_id,
                                    securityPolicy=self.policy_name,
                                    body=body
                                    ).execute()
            return rule
        except Exception as err:
            for i in range(0, len(err.args)):
                logging.error(err.args[i])
                pprint(err.args[i])
            print("==Policy Rule Failed to Add==")
            raise Exception("Policy Rule Failed to Add")

    def get_policy(self):
        try:
            policies = compute.securityPolicies()
            policy = policies.get(project=self.project_id,
                                  securityPolicy=self.policy_name
                                  ).execute()
            return policy
        except Exception as err:
            for i in range(err.args):
                logging.error(err.args[i])
                pprint(err.args[i])
            print("==Failed to Fetch Policy==")
            raise Exception("Failed to Fetch Policy")
于 2020-11-23T10:45:57.730 回答