2

我是 AWS Step Functions 和 AWS Batch 的新手。我正在尝试将 AWS Batch Job 与 Step Function 集成。AWS Batch Job 执行简单的 Python 脚本,输出字符串值(高级简化要求)。我需要让 python 脚本输出可用于 step 函数的下一个状态。我应该如何做到这一点。AWS Batch Job 输出不包含 python 脚本的结果。相反,它包含所有带有输入值的容器相关信息。

示例:AWS Batch Job 执行输出“Hello World”的 python 脚本。我需要“Hello World”可用于 step 函数的下一个状态,以执行与其关联的 lambda。

4

2 回答 2

3

我能够做到,下面是我的状态机,我采用示例项目来运行批处理作业管理批处理作业(AWS Batch,Amazon SNS)并将其修改为两个 lambdas 以传递输入/输出。

{
  "Comment": "An example of the Amazon States Language for notification on an AWS Batch job completion",
  "StartAt": "Submit Batch Job",
  "TimeoutSeconds": 3600,
  "States": {
    "Submit Batch Job": {
      "Type": "Task",
      "Resource": "arn:aws:states:::batch:submitJob.sync",
      "Parameters": {
        "JobName": "BatchJobNotification",
        "JobQueue": "arn:aws:batch:us-east-1:1234567890:job-queue/BatchJobQueue-737ed10e7ca3bfd",
        "JobDefinition": "arn:aws:batch:us-east-1:1234567890:job-definition/BatchJobDefinition-89c42b1f452ac67:1"
      },
      "Next": "Notify Success",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "Notify Failure"
        }
      ]
    },
    "Notify Success": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:1234567890:function:readcloudwatchlogs",
      "Parameters": {
        "LogStreamName.$": "$.Container.LogStreamName"
      },
      "ResultPath": "$.lambdaOutput",
      "Next": "ConsumeLogs"
    },
    "ConsumeLogs": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:1234567890:function:consumelogs",
      "Parameters": {
        "randomstring.$": "$.lambdaOutput.logs"
      },
      "End": true
    },
    "Notify Failure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "Message": "Batch job submitted through Step Functions failed",
        "TopicArn": "arn:aws:sns:us-east-1:1234567890:StepFunctionsSample-BatchJobManagement17968f39-e227-47ab-9a75-08a7dcc10c4c-SNSTopic-1GR29R8TUHQY8"
      },
      "End": true
    }
  }
}

读取日志的关键是在Submit Batch Job包含的输出中LogStreamName,我将其传递给名为 的 lambdafunction:readcloudwatchlogs并读取日志,然后最终将读取的日志传递给名为 的下一个函数function:consumelogs。您可以在随附的屏幕截图consumelogs功能中看到打印日志。


{
  "Attempts": [
    {
      "Container": {
        "ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243",
        "ExitCode": 0,
        "LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b",
        "NetworkInterfaces": [],
        "TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b"
      },
      "StartedAt": 1611329367577,
      "StatusReason": "Essential container in task exited",
      "StoppedAt": 1611329367748
    }
  ],
  "Container": {
    "Command": [
      "echo",
      "Hello world"
    ],
    "ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243",
    "Environment": [
      {
        "Name": "MANAGED_BY_AWS",
        "Value": "STARTED_BY_STEP_FUNCTIONS"
      }
    ],
    "ExitCode": 0,
    "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
    "LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b",
    "TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b",
..
  },
..
  "Tags": {
    "resourceArn": "arn:aws:batch:us-east-1:1234567890:job/d36ba07a-54f9-4acf-a4b8-3e5413ea5ffc"
  }
}

  • 阅读日志 Lambda 代码:
import boto3

client = boto3.client('logs')

def lambda_handler(event, context):
    print(event)
    response = client.get_log_events(
        logGroupName='/aws/batch/job',
        logStreamName=event.get('LogStreamName')
    )
    log = {'logs': response['events'][0]['message']}
    return log
  • 使用日志 Lambda 代码
import json

print('Loading function')


def lambda_handler(event, context):
    print(event)

在此处输入图像描述

在此处输入图像描述

于 2021-01-22T19:57:35.987 回答
2

您可以将步骤函数执行 ID ($$.Execution.ID) 传递给批处理,然后您的批处理可以使用执行 ID 和主键(或其他字段)将其响应写入 DynamoDB。然后,您需要执行后续步骤来直接从 DynamoDB 读取并捕获流程响应。

我一直在寻找一种无需后续步骤即可做到这一点的方法,但到目前为止还没有骰子。

于 2021-09-27T16:23:38.993 回答