0

我有一个自定义培训作业,我使用 Cloud Scheduler 按固定时间表运行。当我使用 Python 客户端或 gcp 创建作业时,作业运行良好。但是,当我使用 Java SDK 创建云调度程序作业时,该作业被创建但它失败了。我在 Cloud Logging 中收到的错误消息摘要是:

{"@type":"type.googleapis.com/google.cloud.scheduler.logging.AttemptFinished", "jobName":"projects/{my_project_id}/locations/us-central1/jobs/java_job", "status":"INVALID_ARGUMENT", "targetType":"HTTP", "url":"https://us-central1-aiplatform.googleapis.com/v1/projects/{my_project_id}/locations/us-central1/customJobs"}

我查看了在 gcp 中创建的作业,三个作业的所有字段(使用 python 客户端创建的一个、使用 java SDK 创建的一个和直接在 gcp 中创建的一个)都是相同的。我无法弄清楚为什么使用 Java SDK 创建的作业总是失败。

Java SDK 代码:

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

import com.google.cloud.scheduler.v1.Job;
import com.google.cloud.scheduler.v1.LocationName;
import com.google.cloud.scheduler.v1.OAuthToken;
import com.google.protobuf.ByteString;
import com.google.cloud.scheduler.v1.CloudSchedulerClient;
import com.google.cloud.scheduler.v1.HttpMethod;
import com.google.cloud.scheduler.v1.HttpTarget;


public class Temp 
{
    
    static String projectId = "...";
    static String location = "...";
    static String serviceAccountEmail = "...-compute@developer.gserviceaccount.com";
    static String outputUriPrefix = "gs://.../.../";
    static String imageUri = String.format("%s-docker.pkg.dev/%s/.../...", location, projectId);
    
    static String trainingJobName = "custom_training_job";
    static String schedulerJobName = String.format("projects/%s/locations/%s/jobs/java_job", projectId, location);
    static String scope = "https://www.googleapis.com/auth/cloud-platform";
    static String httpTargetUri = String.format("https://%s-aiplatform.googleapis.com/v1/projects/%s/locations/%s/customJobs", 
            location, projectId, location);
    static String machineType = "n1-standard-4";
    static long replicaCount = 1;
    
    
    static String getJobBody() throws JSONException {
        JSONObject jobBody = new JSONObject();
        jobBody.put("display_name", trainingJobName);
        JSONObject base_output_directory = new JSONObject();
        base_output_directory.put("output_uri_prefix", outputUriPrefix);
        jobBody.put("base_output_directory", base_output_directory);
        JSONObject jobSpec = new JSONObject();
        JSONArray worker_pool_specs = new JSONArray();
        JSONObject spec = new JSONObject();
        spec.put("replica_count", replicaCount);
        JSONObject machine_spec = new JSONObject();
        machine_spec.put("machine_type", machineType);
        spec.put("machine_spec", machine_spec);
        JSONObject container_spec = new JSONObject();
        container_spec.put( "image_uri", imageUri);
        JSONArray args = new JSONArray();
        args.put("--msg=hello!");
        container_spec.put( "args", args);
        spec.put("container_spec", container_spec);
        worker_pool_specs.put(spec);
        jobSpec.put("worker_pool_specs", worker_pool_specs);
        jobBody.put("job_spec", jobSpec);
        return jobBody.toString();
    }
    
    public static void main( String[] args ) throws IOException, JSONException
    {
        System.out.println(String.format("=======STARTING APPLICATION, version %s =======", "v5"));
        
        CloudSchedulerClient client = CloudSchedulerClient.create();
        
        String parent = LocationName.of(projectId, location).toString();
        
        Map<String, String> headers = new HashMap<String, String>();
        headers.put("User-Agent", "Google-Cloud-Scheduler");
        headers.put("Content-Type", "application/json; charset=utf-8");
        
        OAuthToken token = OAuthToken.newBuilder()
                .setServiceAccountEmail(serviceAccountEmail)
                .setScope(scope)
                .build();       
                
        HttpTarget httpTarget = HttpTarget.newBuilder()
                .setUri(httpTargetUri)
                .setHttpMethod(HttpMethod.POST)
                .putAllHeaders(headers)
                .setBody(ByteString.copyFromUtf8(getJobBody()))
                .setOauthToken(token)
                .build();   
        
        Job job = Job.newBuilder()
                .setName(schedulerJobName)
                .setDescription("test java job")
                .setSchedule("* * * * *")
                .setTimeZone("Africa/Abidjan")
                .setHttpTarget(httpTarget)
                .build();
        
        client.createJob(parent, job);
        client.close();
    }
}

Python客户端代码:

from google.cloud import scheduler
import json


project_id = "..."
location = "..."
service_account_email = "...-compute@developer.gserviceaccount.com"
output_uri_prefix="gs://.../.../"
image_uri=f'{location}-docker.pkg.dev/{project_id}/.../...'

traning_job__name ="custom_training_job"
scheduler_job_name = f'projects/{project_id}/locations/{location}/jobs/python_job'
scope = "https://www.googleapis.com/auth/cloud-platform"
http_target_uri = f'https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/customJobs'
machine_type = "n1-standard-4"
replica_count = 1


job_spec = {
    "display_name": traning_job__name,
    "job_spec": {
            "worker_pool_specs": [
                {
                    "machine_spec": {
                        "machine_type": machine_type,
                    },
                    "replica_count": replica_count,
                    "container_spec": {
                        "image_uri": image_uri,
                        "args": [
                            "--msg=hello!"
                        ]
                    }
                }
            ],
        "base_output_directory": {
            "output_uri_prefix": output_uri_prefix
        }
    }
}


job = {
  "name": scheduler_job_name,
  "description": "Created from Python client",
  "http_target": {
    "uri": http_target_uri,
    "http_method": "POST",
    "headers": {
      "User-Agent": "Google-Cloud-Scheduler",
      "Content-Type": "application/json; charset=utf-8"
    },
    "body": json.dumps(job_spec).encode('utf-8'),
    "oauth_token": {
      "service_account_email": service_account_email,
      "scope": scope
    }
  },
  "schedule": "* * * * *",
  "time_zone": "Africa/Abidjan"
}


client = scheduler.CloudSchedulerClient()
parent = f'projects/{project_id}/locations/{location}' 
response = client.create_job(parent = parent, job = job)

编辑

问题是在getJobBody函数中,我设置base_output_directory为顶级字段,而它应该是job_spec. 问题已解决,但有更好的方法吗?我知道有一个 CustomJobSpec 类,但找不到将其转换为 Json 样式字符串的方法。

4

1 回答 1

1

如编辑中所述,问题在于在 getJobBody 函数中,base_output_directory 被设置为顶级字段,而它应该是 job_spec 中的嵌套字段。所以目前,据我所知,避免这个错误的方法是仔细设置jobBody,我不知道有什么方法可以以更结构化的方式做到这一点。

于 2021-09-13T19:21:27.447 回答