32

我有一个用例,其中我有一个 AWS Step 函数,该函数在文件上传到 S3 时触发,从那里第一步运行 ffprobe 以从外部服务(例如 transloadit)获取文件的持续时间,输出被写入回到 S3。

我可以从该事件中创建一个新的步进函数,但我正在徘徊,如果可以在原始步进函数中有一个 Await 承诺,然后继续下一个 - 考虑到 ffprobe 可能需要更长的时间才能恢复。

任何关于如何解决这个问题的建议都非常感谢。

4

7 回答 7

30

AWS Step Functions 现在支持将长时间运行的步骤作为一流的异步回调。

这类似于@mixja 上面的答案,但简化了。工作流中的单个状态可以直接调用 Lambda、SNS、SQS 或 ECS 并等待调用SendTaskSuccess.

有一个为 SQS 记录的很好的示例,其中步骤函数发送消息并暂停工作流执行,直到某些东西提供回调。Lambda 将是等效的(假设像 transloadit 这样的主要处理发生在 Lambda 本身之外)

您的步进函数定义如下所示

"Invoke transloadit": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
  "Parameters": {
    "FunctionName": "InvokeTransloadit",
    "Payload": {
        "some_other_param": "...",
        "token.$": "$$.Task.Token"
     }
  },
  "Next": "NEXT_STATE"
}

然后在你的 Lambda 你会做类似的事情

def lambda_handler(event, context):
    token = event['token']

    # invoke transloadit via SSM, ECS, passing token along

然后在您的主要长期运行过程中,您将使用令牌发出回调,例如aws stepfunctions send-task-success --task-token $token从 shell 脚本/CLI 或类似的 API 调用。

于 2019-06-21T14:38:21.493 回答
4

当您将请求发送到 transloadit 时,将 s3 中步骤的 taskToken 保存在基于上传文件密钥的可预测密钥中。例如,如果媒体文件位于“s3://my-media-bucket/foobar/media-001.mp3”,您可以制作一个包含当前步骤的任务令牌的 JSON 文件,并使用相同的密钥存储它在不同的存储桶中,例如“s3://ffprobe-tasks/foobar/media-001.mp3.json”。在将媒体发送到 transloadit 的步骤结束时,不要在该步骤上调用成功或失败 - 让它继续运行。

然后当你得到transloadit结果准备好的s3通知时,你可以确定s3键来获取任务令牌('s3://ffprobe-tasks/foobar/media-001.json'),加载JSON(并删除它来自 s3) 并发送该任务的成功。step 函数将继续执行到下一个状态。

于 2017-08-28T13:36:28.403 回答
1

您通常希望将异步任务作为 Step Function 活动启动。这里的关键字是启动- 换句话说,一旦你的活动有一个待处理的动作,那就是你触发你的异步动作的时候。这样做的原因是您需要与未决活动关联的任务令牌 - 然后只要您的“未来”可以以某种方式包含此令牌(例如,您可以将其设置为参考或请求 ID),那么您就可以“完成”使用SendTaskSuccessSendTaskFailure调用成功或失败的活动。

启动任务有两种方法:

  1. 轮询一项新活动。您将设置 CloudWatch 计划事件以每 n 分钟调用一次GetActivityTask 。

  2. 在 step 函数中与您的活动并行触发一个新的“启动器”任务。这个发起者执行与#1 相同的调用,并调用GetActivityTask,唯一的区别是它立即触发并且不需要轮询机制。GetActivityTask 调用会阻塞,直到有新的活动任务可用,因此不存在竞争条件问题。请注意,您可能会从另一个执行中获取活动,因此该发起者只需要考虑活动的输入,而不是发起者本身接收的输入。

这是 Step Function 中 #2 的样子:

发起活动

以及与InitiateManualApprovalActivity任务相关的基本代码示例:

import boto3
import time

client = boto3.client('stepfunctions')
activity = "arn:aws:states:us-east-1:123456789012:activity:ManualStep"

def lambda_handler(event, context):
    print(event)
    # This will block until an activity task becomes available
    task = client.get_activity_task(activityArn=activity, workerName="test")
    print(task)
    # Perform your task here
    # In this example we continue on in the same function,
    # but the continuation could be a separate event, 
    # just as long as you can retrieve the task token
    time.sleep(60)
    response = client.send_task_success(taskToken=task['taskToken'], output=task['input'])
    print(response)
    return "done"
于 2018-04-19T12:34:03.883 回答
1

无法提出简单的解决方案,只能探索几个方向。

首先,Step Functions 有一种特定的方式来处理长时间运行的后台工作:活动。https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html它基本上是一个队列。

如果你想要 100% 无服务器,这将是复杂或丑陋的。

  • 或者,正如您所说,为每个文件创建新的 step 函数
  • Retry或者,使用自定义错误代码和子句在状态机中进行 S3 轮询循环

如果您可以为后台工作人员分配“1/8 微”实例,它并不优雅但很容易,并且可以通过即时反应来实现。低硬件要求暗示我们将只使用机器进行同步。

定义 StepFunction 活动,例如命名为video-duration。为即时反应定义 SQS 队列或为持续时间结果轮询 S3。

状态函数伪代码:

{
  StartAt: ffprobe
  ffprobe: {
    Type: Task
    Resource: arn:...lambda:launch-ffprobe
    Next: wait-duration
  }
  wait-duration: {
    Type: Task
    Resource: arn...activity:video-duration
    End: true
  }
}

后台工作者伪代码:

statemap = dict/map filename to result

thread1:
  loop:
    taskToken, input = SF.GetActivityTask('video-duration')  # long poll
    sync(key=input.filename, waiter=taskToken)
thread2:
  loop:
    msg = SQS.ReceiveMessage(...)  # or poll S3
    sync(key=msg.filename, duration=msg.result)

function sync(key, waiter, duration):
  state = statemap[key]
  if waiter:
    state.waiter = waiter
  if duration:
    state.duration = duration
  if state.waiter and state.duration:
    SF.SendTaskSuccess(state.waiter, state.duration)

S3触发伪代码:

if filename is video:
  SF.StartExecution(...)
else if filename is duration:
  content = S3.GetObject(filename)
  SQS.SendMessage(queue, content)
于 2017-03-06T08:05:34.643 回答
0

当我尝试结合 SFN 来编排 AWS Batch 作业时,我也遇到了这个问题。上面建议的做法是有问题的,因为您应该传递 taskToken,因此您需要从状态机的 lambda 轮询 TaskToken 从队列中,并将其传递给 S3 或其他地方,另一个 lambda 将提交活动状态.

问题是:当您轮询 taskToken 时,您无法知道它是否属于您的状态机实例。您可以改为在同一状态机的另一个实例上获取令牌。就个人而言,我认为如果 AWS 能够支持这个功能会很棒,他们很容易做到......

于 2018-03-29T14:49:44.477 回答
0

好吧,我会从https://aws.amazon.com/blogs/compute/implementing-serverless-manual-approval-steps-in-aws-step-functions-and-amazon-api-gateway/启发自己

您可以将其中的 API Gateway 替换为 AWS Lambda 函数,例如由 S3 事件触发(文档:http ://docs.aws.amazon.com/lambda/latest/dg/with-s3.html )。只要确保您的任务有适当的超时。

于 2017-03-21T23:04:54.320 回答
0

如果您知道 transloadit 完成后会将文件放在 S3 中的哪个位置,您可以循环轮询 S3。要进行轮询,您可以使用HeadObject然后检查响应的状态代码。

AWS Step Function 文档中的一个示例项目描述了这种轮询循环。无需使用您必须为执行付费的 Lambda,您可以直接请求 S3 API,如此所述。如果没有 Lambda,您只需为标准工作流中的状态转换付费

从文档中获取的状态机图。

于 2021-11-12T08:30:15.540 回答