7

我编写了一个从 s3 存储桶触发的 lambda,以解压缩 zip 文件并在其中处理文本文档。由于 lambda 内存的限制,我需要将我的流程转移到 AWS 批处理之类的东西上。如果我错了,请纠正我,但我的工作流程应该是这样的。

工作流程

我相信我需要编写一个 lambda 来将 s3 存储桶的位置放在亚马逊 SQS 上,如果 AWS 批处理可以读取该位置并进行所有解压缩/数据处理,那么它们的内存就更多。

这是我当前的 lambda,它接收由 s3 存储桶触发的事件,检查它是否是一个 zip 文件,然后将该 s3 密钥的名称推送到 SQS。我应该告诉 AWS 批处理开始在我的 lambda 中读取队列吗?总的来说,我对 AWS 完全陌生,不确定是否要从这里开始。

public class dockerEventHandler implements RequestHandler<S3Event, String> {

private static BigData app = new BigData();
private static DomainOfConstants CONST = new DomainOfConstants();
private static Logger log = Logger.getLogger(S3EventProcessorUnzip.class);

private static AmazonSQS SQS;
private static CreateQueueRequest createQueueRequest;
private static Matcher matcher;
private static String srcBucket, srcKey, extension, myQueueUrl;

@Override
public String handleRequest(S3Event s3Event, Context context) 
{
    try {
        for (S3EventNotificationRecord record : s3Event.getRecords())
        {
            srcBucket = record.getS3().getBucket().getName();
            srcKey = record.getS3().getObject().getKey().replace('+', ' ');
            srcKey = URLDecoder.decode(srcKey, "UTF-8");
            matcher = Pattern.compile(".*\\.([^\\.]*)").matcher(srcKey);

            if (!matcher.matches()) 
            {
                log.info(CONST.getNoConnectionMessage() + srcKey);
                return "";
            }
            extension = matcher.group(1).toLowerCase();

            if (!"zip".equals(extension)) 
            {
                log.info("Skipping non-zip file " + srcKey + " with extension " + extension);
                return "";
            }
            log.info("Sending object location to key" + srcBucket + "//" + srcKey);

            //pass in only the reference of where the object is located
            createQue(CONST.getQueueName(), srcKey);
        }
    }
    catch (IOException e)
    {
        log.error(e);           
    }
    return "Ok";
} 

/*
 * 
 * Setup connection to amazon SQS
 * TODO - Find updated api for sqs connection to eliminate depreciation
 * 
 * */
@SuppressWarnings("deprecation")
public static void sQSConnection() {
    app.setAwsCredentials(CONST.getAccessKey(), CONST.getSecretKey());       
    try{
        SQS = new AmazonSQSClient(app.getAwsCredentials()); 
        Region usEast1 = Region.getRegion(Regions.US_EAST_1);
        SQS.setRegion(usEast1);
    } 
    catch(Exception e){
        log.error(e);       
    }
}

//Create new Queue
public static void createQue(String queName, String message){
    createQueueRequest = new CreateQueueRequest(queName);
    myQueueUrl = SQS.createQueue(createQueueRequest).getQueueUrl();
    sendMessage(myQueueUrl,message);
}

//Send reference to the s3 objects location to the queue
public static void sendMessage(String SIMPLE_QUE_URL, String S3KeyName){
    SQS.sendMessage(new SendMessageRequest(SIMPLE_QUE_URL, S3KeyName));
}

//Fire AWS batch to pull from que
private static void initializeBatch(){
    //TODO
}

我已经设置了 docker 并了解了 docker 图像。我相信我的 docker 镜像应该包含所有代码来读取队列、解压缩、处理并将文件安装到 RDS 中,所有这些都在一个 docker 镜像/容器中。

我正在寻找做过类似事情的人,他们可以分享以提供帮助。类似的东西:

S3 先生:嘿 lambda 我有一个文件

Lambda 先生:好的,S3,我看到你了,嘿,aws batch 你能解压缩并对此做些什么吗?

Mr. Batch:Gotchya mr lambda,我会处理好这个,然后把它放在 RDS 或一些数据库中。

我还没有编写类/docker 映像,但我已经完成了所有代码来处理/解压缩并开始执行 rds。由于某些文件为 1gb 或更大,Lambda 仅限于内存。

4

1 回答 1

8

好的,在查看 Batch 上的 AWS 文档后,您不需要 SQS 队列。Batch 有一个名为 Job Queue 的概念,它类似于 SQS FIFO 队列,但不同之处在于这些作业队列具有优先级,其中的作业可以依赖于其他作业。基本流程是:

  1. 首先,奇怪的部分是设置 IAM 角色,以便容器代理可以与容器服务通信,并且 AWS 批处理能够在需要时启动各种实例(如果您使用 Spot 实例,还需要一个单独的角色)。有关所需权限的详细信息,请参见第 54 页左右的此文档 (PDF) 。
  2. 现在完成后,您设置了一个计算环境。这些是保存容器的 EC2 按需实例或现货实例。作业在容器级别上运行。这个想法是您的计算环境是您的作业容器可以利用的最大资源分配。一旦达到该限制,您的工作就必须等待资源被释放。
  3. 现在您创建一个作业队列。这会将作业与您创建的计算环境相关联。
  4. 现在您创建一个作业定义。好吧,从技术上讲,您不必也可以通过 lambda 来完成,但这会使事情变得更容易一些。您的工作定义将指示您的工作需要哪些容器资源(您当然也可以在 lambda 中覆盖它)
  5. 现在这一切都完成了,您将要创建一个 lambda 函数。这将由您的 S3 存储桶事件触发。该函数需要必要的 IAM 权限才能针对批处理服务运行提交作业(以及任何其他权限)。基本上所有 lambda 需要做的就是调用 AWS 批处理提交作业。您需要的基本参数是作业队列和作业定义。您还将为所需的 zip 设置 S3 密钥作为作业的参数。
  6. 现在,当触发相应的 S3 事件时,它会调用 lambda,然后将作业提交到 AWS 批处理作业队列。然后假设设置都很好,它会很高兴地提取资源来处理你的工作。请注意,根据 EC2 实例大小和分配的容器资源,这可能需要一些时间(比准备 Lambda 函数要长得多)。
于 2017-06-27T03:24:53.577 回答