我正在尝试将云功能与在 Compute Engine 上运行的处理传入数据的 AI 程序连接起来。Cloud Function目前是由Cloud Storage存储桶的变化触发的,每次上传数据都会启动VM,但是当同时上传大量数据时,会多次重启实例,中断数据处理。如何对传入数据进行排队,并让 Compute Engine 实例在之前的进程完成时读取和处理传入数据?我看过 Pub/Sub 和 Cloud Tasks,但这些实现提到了扩展多个实例或连接到 App Engine。是否需要修改分析数据的流程以便按顺序处理数据,或者可以使用 Google Cloud 工具来完成吗?
编辑
附件是文件上传到 GCS 存储桶时运行的云函数。注意触发的实例总是一样的,我的理论是后续的触发器重写元数据,或者立即重新启动实例或更改实例的指令,从而中断先前文件的处理。
import os
from googleapiclient.discovery import build
def start(event, context):
file = event
print(file["id"])
string = file["id"]
newstring = string.split('/')
userId = newstring[1]
paymentId = newstring[2]
name = newstring[3]
print(name)
if name == "uploadcomplete.txt":
startup_script = """#! /bin/bash
cd ~ && pwd 1>>/var/log/log.out 2>&1
PATH=$PATH://usr/local/cuda 1>>/var/log/log.out 2>&1
cd program_directory 1>>/var/log/log.out 2>&1
source /opt/anaconda3/etc/profile.d/conda.sh 1>/var/log/log.out 2>&1
conda activate env
cd keras-retinanet/ 1>>/var/log/log.out 2>&1
export PYTHONPATH=`pwd` 1>>/var/log/log.out 2>&1
cd tracker 1>>/var/log/log.out 2>&1
python program_name --gcs_input_path gs://input/{userId}/{paymentId} --gcs_output_path gs://output/{userId}/{paymentId} 1>>/var/log/log.out 2>&1
sudo python3 gcs_to_mongo.py {userId} {paymentId} 1>>/var/log/log.out 2>&1
sudo shutdown -P now
""".format(userId=userId, paymentId=paymentId)
service = build('compute', 'v1', cache_discovery=False)
print('VM Instance starting')
project = 'XXXX'
zone = 'us-east1-c'
instance = 'YYYY'
metadata = service.instances().get(project=project, zone=zone, instance=instance)
metares = metadata.execute()
print(metares)
fingerprint = metares["metadata"]["fingerprint"]
print(fingerprint)
bodydata = {"fingerprint": fingerprint,
"items": [{"key": "startup-script", "value": startup_script}]}
print(bodydata)
meta = service.instances().setMetadata(project=project, zone=zone, instance=instance,
body=bodydata)
res = meta.execute()
instanceget = service.instances().get(project=project, zone=zone, instance=instance).execute()
request = service.instances().start(project=project, zone=zone, instance=instance)
response = request.execute()
print('VM Instance started')
print(instanceget)
print("'New Metadata:", instanceget['metadata'])