0

当前设置:

我目前有一个 Step Functions 状态机,它启动一个任务状态(它调用一个 Lambda 函数),然后是一个映射状态(它向 Batch 提交一个作业),定义如下

状态机定义

(注:地区和账户ID已被省略并替换为虚拟变量ACCOUNT_INFO

{
  "StartAt": "Populate EFS",
  "States": {
    "Populate EFS": {
      "Next": "MapState",
      "Type": "Task",
      "InputPath": "$",
      "ResultPath": "$.populate_efs_result",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:{ACCOUNT_INFO}:function:PopulateEFSLambda",
        "Payload.$": "$"
      }
    },
    "MapState": {
      "Type": "Map",
      "End": true,
      "ResultPath": "$.metadata.run_info",
      "InputPath": "$",
      "Iterator": {
        "StartAt": "TaskState",
        "States": {
          "TaskState": {
            "Type": "Task",
            "End": true,
            "InputPath": "$",
            "ResultPath": null,
            "Resource": "arn:aws:states:::batch:submitJob.sync",
            "Parameters": {
              "JobDefinition": "arn:aws:batch:{ACCOUNT_INFO}:job-definition/BatchJobDefCfn:1",
              "JobName": "test",
              "JobQueue": "arn:aws:batch:{ACCOUNT_INFO}:job-queue/BatchQueue123",
              "ContainerOverrides": {
                "Command": [
                  "sh",
                  "-c",
                  "entrypoint.pl -i /NGS/${sequencer}/${run_id}/ -s ${sample_name}"
                ],
                "Environment": [
                  {
                    "Name": "run_id",
                    "Value.$": "$.run_id"
                  },
                  {
                    "Name": "sample_name",
                    "Value.$": "$.sample_name"
                  },
                  {
                    "Name": "sequencer",
                    "Value.$": "$.sequencer"
                  }
                ]
              }
            }
          }
        }
      }
    }
  }
}

状态机输入

{
  "metadata": {
    "run_info": [
      {
        "sample_name": "SAMPLE_X",
        "sequencer": "Nextseq"
      },
      {
        "sample_name": "SAMPLE_Y",
        "sequencer": "Nextseq"
      },
      {
        "sample_name": "SAMPLE_Z",
        "sequencer": "Nextseq"
      }
    ]
  }
}

Lambda 输出(为简单起见缩短)

{"populate_efs_result": {
    "ExecutedVersion": "$LATEST",
    "Payload": "RUN_1"}

预期结果:

第二步 ( MapState) 需要来自机器输入 (sample_namesequencer) 的信息,以及 Lambda 函数在populate_efs_result.Payload( run_id) 中返回的信息,因此两者都需要包含在 Map 状态输入的事件对象中。然而,到目前为止,在我的尝试中,地图状态的输入要么是机器输入,要么是 Lambda 输出,而不是两者。

我已尝试更改 Map 状态定义中的InputPathItemsPath参数,还尝试在 Map 状态定义中包含以下内容,但这些方法均不起作用:Parameters: {"new_run_id.$": "$.populate_efs_result.Payload"}.

4

1 回答 1

0

一个简单但不那么优雅的解决方案可能是将您的 lambda 步骤移动到 Map 状态中。优点是来自 lambda 的响应将在映射状态的上下文中(如果您的 lambda 响应特定于映射状态的每次迭代,则可能需要这样做)。缺点是您的 lambda 函数需要为地图的每次迭代执行,而 lambda 函数又快又便宜,它仍然不是一个完美的解决方案。

另一种方法是将执行输入传递给该 lambda,然后扩展 lambda 以修改run_info数组以包含所需的数据。然后将此修改后的数组作为InputPath

于 2021-09-27T17:23:47.933 回答