我目前正在使用 python GCP API 创建云任务队列。我的代码是从示例代码修改的,逻辑是检查队列是否存在,如果不存在则创建一个新队列并将新任务放入该队列。所以我使用 try-except 和 importfrom google.api_core import exceptions
来处理错误。但是现在的问题一直是我的服务帐户没有权限云任务。这是错误。
google.api_core.exceptions.PermissionDenied
google.api_core.exceptions.PermissionDenied: 403 The principal (user or service account) lacks IAM permission "cloudtasks.tasks.create" for the resource "projects/xxxx/locations/us-central1" (or the resource may not exist).
这是我的代码。
@app.route('/train_model/<dataset_name>/<dataset_id>/', methods=["POST", "GET"])
def train_model(dataset_name,dataset_id):
if request.method == 'POST':
form = request.form
model = form.get('model_name')
date = form.get('date')
datetime_object = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
timezone = pytz.timezone('Asia/Hong_Kong')
timezone_date_time_obj = timezone.localize(datetime_object)
data=[dataset_id,model]
payload = str(data).encode()
# Create a client.
url = "https://us-central1-xxx.cloudfunctions.net/create_csv"
try:
client = tasks_v2.CloudTasksClient.from_service_account_json(
'./xxxxx.json')
url = "https://us-central1-xxxxxx.cloudfunctions.net/create_csv"
location = 'us-central1'
project = 'xxxxx'
queue = 'testing1'
parent = client.location_path(project, location)
task = {
"http_request": {
'http_method': 'POST',
'url': url,
'body': payload
}}
# set schedule time
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(timezone_date_time_obj)
task['schedule_time'] = timestamp
response = client.create_task(parent, task)
except exceptions.FailedPrecondition:
location = 'us-central1'
project = 397901391776
# Create a client.
client = tasks_v2.CloudTasksClient.from_service_account_json(
"./xxxx.json")
parent = client.location_path(project, location)
queue = {"name": 'x'}
queue.update(name="projects/xxxxx/locations/us-west2/queues/" + queue #the name of the queue from try.)
response = client.create_queue(parent, queue)
parent = client.queue_path(project, location, queue)
task = {
"http_request": {
'http_method': 'POST',
'url': url,
'body':payload
}}
# set schedule time
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(timezone_date_time_obj)
task['schedule_time'] = timestamp
response = client.create_task(parent, task)
print(response)
return redirect('/datasetinfo/{}/{}/'.format(dataset_name,dataset_id))