0

我正在尝试将云功能与在 Compute Engine 上运行的处理传入数据的 AI 程序连接起来。Cloud Function目前是由Cloud Storage存储桶的变化触发的,每次上传数据都会启动VM,但是当同时上传大量数据时,会多次重启实例,中断数据处理。如何对传入数据进行排队,并让 Compute Engine 实例在之前的进程完成时读取和处理传入数据?我看过 Pub/Sub 和 Cloud Tasks,但这些实现提到了扩展多个实例或连接到 App Engine。是否需要修改分析数据的流程以便按顺序处理数据,或者可以使用 Google Cloud 工具来完成吗?

编辑

附件是文件上传到 GCS 存储桶时运行的云函数。注意触发的实例总是一样的,我的理论是后续的触发器重写元数据,或者立即重新启动实例或更改实例的指令,从而中断先前文件的处理。

import os
from googleapiclient.discovery import build


def start(event, context):
    file = event
    print(file["id"])

    string = file["id"]

    newstring = string.split('/')
    userId = newstring[1]
    paymentId = newstring[2]
    name = newstring[3]

    print(name)

    if name == "uploadcomplete.txt":
        startup_script = """#! /bin/bash
                cd ~ && pwd 1>>/var/log/log.out 2>&1
                PATH=$PATH://usr/local/cuda 1>>/var/log/log.out 2>&1
                cd program_directory 1>>/var/log/log.out 2>&1
                source /opt/anaconda3/etc/profile.d/conda.sh 1>/var/log/log.out 2>&1
                conda activate env
                cd keras-retinanet/ 1>>/var/log/log.out 2>&1
                export PYTHONPATH=`pwd` 1>>/var/log/log.out 2>&1
                cd tracker 1>>/var/log/log.out 2>&1
                python program_name --gcs_input_path gs://input/{userId}/{paymentId} --gcs_output_path gs://output/{userId}/{paymentId} 1>>/var/log/log.out 2>&1
                sudo python3 gcs_to_mongo.py {userId} {paymentId} 1>>/var/log/log.out 2>&1
                sudo shutdown -P now
                """.format(userId=userId, paymentId=paymentId)
                
        service = build('compute', 'v1', cache_discovery=False)
        print('VM Instance starting')
        project = 'XXXX'
        zone = 'us-east1-c'
        instance = 'YYYY'
        metadata = service.instances().get(project=project, zone=zone, instance=instance)
        metares = metadata.execute()
        print(metares)
    
        fingerprint = metares["metadata"]["fingerprint"]
        print(fingerprint)
        bodydata = {"fingerprint": fingerprint,
                    "items": [{"key": "startup-script", "value": startup_script}]}

        print(bodydata)        
        meta = service.instances().setMetadata(project=project, zone=zone, instance=instance,
                                               body=bodydata)
        res = meta.execute()
        instanceget = service.instances().get(project=project, zone=zone, instance=instance).execute()
        request = service.instances().start(project=project, zone=zone, instance=instance)
        response = request.execute()
        print('VM Instance started')
        print(instanceget)
        print("'New Metadata:", instanceget['metadata'])
4

0 回答 0