6

我对一个大循环使用步进函数,到目前为止没问题,但是当我的循环超过 8000 次执行时,我遇到了错误“最大执行历史大小”,即 25000。

有没有没有历史事件的解决方案?

否则,我可以轻松迁移我的步进函数(3 lambda),因为 aws batch 会要求我重写很多代码..

非常感谢

4

2 回答 2

4

避免 25k 历史事件限制的一种方法是在循环中添加一个选择状态,该状态接受一个计数器或布尔值并决定退出循环。

在循环之外,您可以放置​​一个启动另一个执行的 lambda 函数(具有不同的 id)。在此之后,您当前的执行正常完成,另一个执行将继续完成工作。

请注意,下面示例中的“LoopProcessor”必须返回一个变量“$.breakOutOfLoop”才能跳出循环,该变量也必须在循环中的某处确定并通过。

根据您的用例,您可能需要重组您传递的数据。例如,如果您正在处理大量数据,您可能需要考虑使用 S3 对象并通过状态机执行将 ARN 作为输入/输出传递。如果您尝试做一个简单的循环,一种简单的方法是添加一个起始偏移量(将其视为一个全局计数器),该偏移量作为输入传递给执行,每个 LoopProcessor 任务将增加一个计数器(与开始偏移量作为初始值)。这类似于分页解决方案。

这是避免 25k 历史事件限制的 ASL 结构的基本示例:

{
  "Comment": "An example looping while avoiding the 25k event history limit.",
  "StartAt": "FirstState",
  "States": {

    "FirstState": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME",
      "Next": "ChoiceState"
    },

    "ChoiceState": {
      "Type" : "Choice",
      "Choices": [
        {
          "Variable": "$.breakOutOfLoop",
          "BooleanEquals": true,
          "Next": "StartNewExecution"
        }
      ],
      "Default": "LoopProcessor"
    },

    "LoopProcessor": {
      "Type" : "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:ProcessWork",
      "Next": "ChoiceState"
    },

    "StartNewExecution": {
      "Type" : "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:StartNewLooperExecution",
      "Next": "FinalState"
    },

    "FinalState": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME",
      "End": true
    }
  }
}

循环处理器示例

希望这可以帮助!

于 2017-07-06T21:38:55.740 回答
2

为了保证所有步骤及其顺序的执行,step函数存储每个状态完成后的执行历史,这种存储是历史执行大小限制的原因。

话虽如此,减轻此限制的一种方法是遵循@sunnyD 的答案。但是,它有以下限制

  1. 步进函数的调用者(如果有的话)不会得到完整数据的执行输出。相反,他获得了执行链中第一次执行的输出。
  2. 执行历史大小的数量限制在未来的版本中很有可能会增加,因此在这个数字上编写逻辑将需要您在每次增加或减少限制时修改代码/配置。

另一种替代解决方案是将阶跃函数安排为父阶跃函数和子阶跃函数。在这种安排中,父阶跃函数包含一个任务来循环整个数据集并为每个记录或记录集创建新的子阶跃函数执行(一个不会超过子SF的历史执行限制的数字)在您的数据中。父 step 函数中的第二步将等待一段时间,然后它会检查 Cloudwatch 指标是否完成所有子函数并退出输出。

关于这个解决方案需要记住的几件事是,

  1. startExecution API 将限制在 500 个桶大小,每秒 25 次重新填充。
  2. 确保您在父 SF 中的等待时间足以让子 SF 完成其执行,否则执行一个循环来检查子 SF 的完成情况。
于 2018-03-12T20:45:28.583 回答