2

我正在尝试使用状态机语言中定义的 Parallel 和 Catch 块在我的步进函数流中添加错误处理。

以下是我的步骤功能的流程图:

在此处输入图像描述

因为我想要一个用于所有步进函数的通用错误处理程序,所以我将它们包装在一个 Parallel 块中,并添加了一个通用 Catch 块来捕获任何步进函数中的任何错误。在浏览各种示例和博客时,我遵循了这个链接并实现了类似的方法。

我观察到的是,每当任何状态引发异常时,控件都会进入 catch 块。catch 块的输入是引发的异常,该异常包含 JSON 对象中的错误和原因。由于我想要错误以及传递给该状态的输入,因此我在 catch 块中将ResultPath添加为“$.error”。以下是定义状态机的 JSON 规范。

    {
  "StartAt": "Try",
  "States": {
    "Try": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Step-1",
          "States": {
            "Step-1": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:1234:function:step-1-lambda",
              "Next": "Step-2"
            },
            "Step-2": {
              "Type": "Choice",
              "Choices": [
                {
                  "Variable": "$.some_variable",
                  "StringEquals": "some_string",
                  "Next": "Step-3"
                },
                {
                  "Variable": "$.some_variable",
                  "StringEquals": "some_other_string",
                  "Next": "Step-4"
                }
              ],
              "Default": "Step-6"
            },
            "Step-3": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:1234:function:step-3-lambda",
              "Next": "Step-6"
            },
            "Step-4": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:1234:function:step-4-lambda",
              "Next": "Step-6"
            },
            "Step-6": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:1234:function:step-6-lambda",
              "End": true
            }
          }
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "ResultPath": "$.error",
          "Next": "ErrorHandler"
        }
      ],
      "Next": "UnwrapOutput"
    },
    "UnwrapOutput": {
      "Type": "Pass",
      "InputPath": "$[0]",
      "End": true
    },
    "ErrorHandler": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:1234:function:step-7-lambda",
      "End": true
    }
  }
}

例如,考虑第 4 步生成异常。这个状态的输入是:

{
   "foo": "abc",
   "bar": "def"
}

触发状态机的输入是:

{
  "boo": "jkl",
   "baz": "mno"
}

在 ErrorHandler 中,当第 4 步生成异常时,我期望 ErrorHandler 状态的输入是:

{
  "foo": "abc",
   "bar": "def",
   "error": {
       "Error": "SomeError",
       "Cause": "SomeCause"
   }
}

但是,接收到的输入包含用于触发流的原始输入。

{
  "boo": "jkl",
   "baz": "mno",
   "error": {
       "Error": "SomeError",
       "Cause": "SomeCause"
   }
}

我需要访问导致 ErrorHandler 异常的状态的输入字段。使用“$”它提供用于触发流程的输入。有没有办法可以做到这一点?

任何帮助将不胜感激,我一直在努力解决这个问题。

4

1 回答 1

0

我只晚了 10 个月,没那么多哈哈,但我希望你已经找到了解决方案,无论如何,我会分享我的两分钱,以便我可以帮助另一个开发人员,或者更好,有人可以告诉我一个更好的方法来做到这一点!

首先,让我们看看我们有哪些场景:

  • 同步作业执行
  • 异步作业执行

我们的目标:以某种方式访问​​触发错误的作业

第一个解决方案 - 适用于所有场景:

  • 基本上,将自定义 try catch 添加到您的所有作业资产中,换句话说,您的 lambda 函数应该抛出一个错误,提供有关它正在使用的作业的信息。我不太喜欢这种方法,因为您正在更改孤立的函数以在状态机中实现一些逻辑。最后,您将两个独立的概念结合在一起,您的状态机不应该需要外部工具来操作和记录自己的上下文。我在这里可能是错的,但这只是我的两分钱,请随意冒犯我的家人(开个玩笑,但如你所愿纠正我)。

第二种解决方案 - 应用于 Sych 作业执行

  • 当您在状态机中添加“addCatch”时,默认行为是错误输出以覆盖步骤输入。要解决这个问题,您只需要更改 addCatch resultPath,这样您就可以将错误输出与步骤输入一起存储。

    EX:“Catch”:[{“ErrorEquals”:[“States.All”],“Next”:“ErrorHandler”“ResultPath”:“$.error-info”}]

但是为什么这很重要??????
  • 这样,您将能够访问 errorHandlerJob 中的步骤输入,这意味着您始终可以将 stepName 传递给下一步输入,这样您就可以始终知道哪个作业失败了。而且您不会通过直接更改 lambda 函数来做到这一点,而是通过使用作业的属性来解决耦合问题!但这在 ASYNC 场景中不起作用,我将在接下来进行解释。

第三种解决方案——应用于异步作业执行

  • 以前的解决方案在这里不起作用,因为在这种情况下,您只能访问原始输入,因为您使用的是并行分支。所以我在这里所做的与上一个案例类似。我在并行分支中添加了 Pass 状态,这些 Pass 状态负责同步调用我的作业,而且我的所有作业都有自己的 errorHandlingJob 而不是不同的 LAMBDA 函数。我没有在 AWS 上创建新资源,只有一个 HandleError Lambda 函数,因此我可以将监控重点放在那个特定函数上。但是,我使用它为状态机必须执行的每个作业创建一个 errorHandlingJob。
  • 缺点是你的状态机现在有一个巨大的图表,但好的部分是你现在可以记录哪个作业失败了。
没有任何抽象,它会是这样的“使用 CDK”
    const job1 = new tasks.LambdaInvoke(scope, 'First Job -- PASS', {
        lambdaFunction: function1,
        outputPath: '$.Payload'
    })

    const job2 = new tasks.LambdaInvoke(scope, 'Second Job -- PASS', {
        lambdaFunction: function2,
        outputPath: '$.Payload'
    })

    const job3 = new tasks.LambdaInvoke(scope, 'Third Job -- PASS', {
        lambdaFunction: function3,
        outputPath: '$.Payload'
    })

    const generateHandleErrorJob = () => new tasks.LambdaInvoke(scope, `Handle Error Job ${Math.random() * 160000000}`, {
        lambdaFunction: functionError,
        outputPath: '$.Payload'
    })

    const jobToThrowError = new tasks.LambdaInvoke(scope, 'Job To Throw Error -- PASS', {
        lambdaFunction: fucntionThrowError,
        outputPath: '$.Payload',
    })

    const generatePassCheckSetep = (stepName: string) => new sfn.Pass(scope, `Pass: ${stepName}`, {
        resultPath: '$.step-info',
        result: sfn.Result.fromObject({
            step: stepName
        })
    })

    const definition = new sfn.Parallel(scope, 'Parallel Execution -- PASS')
        .branch(generatePassCheckSetep('job1').next(job1.addCatch(generateHandleErrorJob(), {resultPath: '$.error-info'})))
        .branch(generatePassCheckSetep('jobToThrowError').next(jobToThrowError.addCatch(generateHandleErrorJob(), {resultPath: '$.error-info'})))
        .branch(generatePassCheckSetep('job2').next(job2.addCatch(generateHandleErrorJob(), {resultPath: '$.error-info'})))
        .next(job3)

    new sfn.StateMachine(scope, id, {
        definition,
        timeout: cdk.Duration.minutes(3)
    })

但我还创建了一个抽象“ParallelStateMachineCatch”,因此您可以像这样使用:

this.definition = new ParallelStateMachineCatch(this, 
}, handleErrorFunction)
  .branchCatch(job1)
  .branchCatch(job2)
  .branchCatch(job3)
  .branchCatch(job4)
  .branchCatch(job5)
  .branchCatch(job6)
  .next(final)

}

这是 ParallelStateMachineCatch 代码:

import { Construct, Duration } from 'monocdk'
import { NodejsFunction } from 'monocdk/aws-lambda-nodejs'
import { Pass,Result, Parallel, ParallelProps } from 'monocdk/aws-stepfunctions'
import { LambdaInvoke } from 'monocdk/aws-stepfunctions-tasks'

export interface DefinitionProps {
  sonosEnvironment: string
  region: string
  accountNumber: string
}

export class ParallelStateMachineCatch extends Parallel {
  private errorHandler: NodejsFunction

  constructor(scope: Construct, id: string, props: ParallelProps, errorHandler: NodejsFunction) {
    super(scope, id, props)
    this.errorHandler = errorHandler
  }



  branchCatch(task: LambdaInvoke): ParallelStateMachineCatch {
    const randomId = Math.random().toString().replace('0.', '')
    const passInputJob = ParallelStateMachineCatch.generatePassInput(this, task.id, randomId)
    const handleErrorJob = ParallelStateMachineCatch.generateHandleErrorJob(this, this.errorHandler, randomId)
    const resultPath = '$.error-info'

    this.branch(passInputJob.next(task.addCatch(handleErrorJob, { resultPath })))

    return this
  }

  private static generateHandleErrorJob(scope: Construct, errorHandler: NodejsFunction, randomId: string): LambdaInvoke {
    return new LambdaInvoke(scope, `Handle Error ${ randomId }`, {
      lambdaFunction: errorHandler,
      outputPath: '$.Payload',
      timeout: Duration.seconds(5),
    })
  }

  private static generatePassInput(scope: Construct, stepName: string, randomId: string): Pass {
    return new Pass(scope, `Pass Input ${ randomId }`, {
      resultPath: '$.step-info',
      result: Result.fromObject({
        name: stepName
      })
    })
  }

}

无论如何,我希望我能帮助别人,这就是我设法解决这个问题的方法。请随时教我更好的方法!Tks 好运和好代码

于 2021-06-07T14:44:25.560 回答