-1

我使用 envContainerOverrides 从 stepfunctions 读取输出,然后我用这个 nodejscode 调用我的批处理作业:这里我正在读取传递给批处理作业的环境变量。

阶跃函数输出:

{"bucketName":"bucketName","filesList":["filelist-bucket/filelist1.txt","filelist-bucket/filelist2.txt"]}

如果它是一个数组 [filelist-bucket/filelist1.txt","filelist-bucket/filelist2.txt],则此 nodejs 代码无法获取,如果它是单个值,则它可以完美运行。

我想将此 nodejs 代码转换为 java :

'use strict';

const AWS = require('aws-sdk');

console.log('Loading function');

exports.handler = (event, context, callback) => {
    // Take the data from step 1 and modify, send to standard output
    var comment = event.Comment ;

    var envContainerOverrides ={
        "environment" :[
          {
           "name":"s3Bucket",
           "value":event.bucketName
          },
          {
           "name":"s3FileList",
           "value":event.filesListUrl
          }
        ]
    };
    const params = {
        jobDefinition: process.env.JOB_DEFINITION,
        jobName: process.env.JOB_NAME,
        jobQueue:process.env.JOB_QUEUE,
        containerOverrides: envContainerOverrides || null,
        parameters: event.parameters || null,
    };


    // Submit the Batch Job
    new AWS.Batch().submitJob(params, (err, data) => {
        if (err) {
            console.error(err);
            const message = `Error calling SubmitJob for: ${event.jobName}`;
            console.error(message);
            callback(message);
        } else {
            const jobId = data.jobId;
            console.log('jobId:', jobId);
            callback(null, "Job Id : "+jobId);
        }
    });



};

我正在做这样的事情:

public class InitiateBatchJob1 {

    public static BatchJobRequest process(BatchJobRequest batchJobRequest) throws Exception {


        String s3Bucket = batchJobRequest.getBucketName();
        List<String> s3FileList = batchJobRequest.getFilesListUrl();

        Job job = new Job();
        job.setJobDefinition("testbatchjobenv:2");
        job.setJobQueue("nbatchjobqueue");
        job.setJobName("Filedownload");


        /*// Submit the Batch Job
        new AWS.Batch().submitJob(params, (err, data) => {
            if (err) {
                console.error(err);
                const message = `Error calling SubmitJob for: ${event.jobName}`;
                console.error(message);
                callback(message);
            } else {
                const jobId = data.jobId;
                console.log('jobId:', jobId);
                callback(null, "Job Id : "+jobId);
            }
        });
*/



        return null;
    }


}

批处理作业请求.java

import java.util.List;

public class BatchJobRequest {

    private String bucketName;

    private List<String> filesListUrl;

    public String getBucketName() {
        return bucketName;
    }

    public void setBucketName(String bucketName) {
        this.bucketName = bucketName;
    }

    public List<String> getFilesListUrl() {
        return filesListUrl;
    }

    public void setFilesListUrl(List<String> filesListUrl) {
        this.filesListUrl = filesListUrl;
    }

}

工作.java:

public class Job {

    private String jobDefinition;
    private String jobName;
    private String jobQueue;
    private String containerOverrides;
    private String parameters;
    public String getJobDefinition() {
        return jobDefinition;
    }
    public void setJobDefinition(String jobDefinition) {
        this.jobDefinition = jobDefinition;
    }
    public String getJobName() {
        return jobName;
    }
    public void setJobName(String jobName) {
        this.jobName = jobName;
    }
    public String getJobQueue() {
        return jobQueue;
    }
    public void setJobQueue(String jobQueue) {
        this.jobQueue = jobQueue;
    }
    public String getContainerOverrides() {
        return containerOverrides;
    }
    public void setContainerOverrides(String containerOverrides) {
        this.containerOverrides = containerOverrides;
    }
    public String getParameters() {
        return parameters;
    }
    public void setParameters(String parameters) {
        this.parameters = parameters;
    }


}

我已经为 AWSBatch http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/batch/AWSBatch.html#cancelJob-com.amazonaws.services.batch.model 找到了这个 api。 CancelJobRequest-

但不确定它是否在我的课堂上使用正确的 api。我在网上搜索了在 java 中使用 aws batch 的链接并不多。

基本上我需要知道如何设置作业定义、作业名称、作业队列并使用 java 类提交批处理作业。任何人都可以帮我解决这个问题。

4

1 回答 1

0

你必须这样做: http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/batch/AWSBatch.html#submitJob-com.amazonaws.services.batch.model。提交工作请求-

AWSBatch client = AWSBatchClientBuilder.standard().build();
    SubmitJobRequest request = new SubmitJobRequest().withJobName("some-name")
            .withJobQueue("job-queue-name")
            .withJobDefinition("job-definition-name-with-revision-number:1");
    SubmitJobResult response = client.submitJob(request);

我将此代码与 lambda 函数一起使用,并将 IAM 角色与我在 aws 控制台中创建的 AWSBatchFullAccess 一起使用。因此,在 aws 上构建和加载 jar 之后,我的“客户端”使用从 lambda 获取的数据进行了初始化。在您的应用程序中,您似乎还需要添加数据来初始化客户端。你必须使用方法

AWSBatch build(AwsSyncClientParams params)

看 AWSBatchClientBuilder 类。

您仍然需要创建作业队列和作业定义。另外,我建议您开始使用 aws 控制台。这是了解 aws 批处理工作流的好教程http://technology.finra.org/code/enjoying-auto-scaling-integrated-authentication-low-host-cost.html http://www.awsomeblog.com/analysing- exif-data-with-aws-batch/。在本教程之后,您可以在 myHandler 方法中添加用于提交作业的代码,并查看作业如何启动。

于 2017-12-12T13:33:22.553 回答