我有一个用例,其中我有一个 AWS Step 函数,该函数在文件上传到 S3 时触发,从那里第一步运行 ffprobe 以从外部服务(例如 transloadit)获取文件的持续时间,输出被写入回到 S3。
我可以从该事件中创建一个新的步进函数,但我正在徘徊,如果可以在原始步进函数中有一个 Await 承诺,然后继续下一个 - 考虑到 ffprobe 可能需要更长的时间才能恢复。
任何关于如何解决这个问题的建议都非常感谢。
我有一个用例,其中我有一个 AWS Step 函数,该函数在文件上传到 S3 时触发,从那里第一步运行 ffprobe 以从外部服务(例如 transloadit)获取文件的持续时间,输出被写入回到 S3。
我可以从该事件中创建一个新的步进函数,但我正在徘徊,如果可以在原始步进函数中有一个 Await 承诺,然后继续下一个 - 考虑到 ffprobe 可能需要更长的时间才能恢复。
任何关于如何解决这个问题的建议都非常感谢。
AWS Step Functions 现在支持将长时间运行的步骤作为一流的异步回调。
这类似于@mixja 上面的答案,但简化了。工作流中的单个状态可以直接调用 Lambda、SNS、SQS 或 ECS 并等待调用SendTaskSuccess
.
有一个为 SQS 记录的很好的示例,其中步骤函数发送消息并暂停工作流执行,直到某些东西提供回调。Lambda 将是等效的(假设像 transloadit 这样的主要处理发生在 Lambda 本身之外)
您的步进函数定义如下所示
"Invoke transloadit": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "InvokeTransloadit",
"Payload": {
"some_other_param": "...",
"token.$": "$$.Task.Token"
}
},
"Next": "NEXT_STATE"
}
然后在你的 Lambda 你会做类似的事情
def lambda_handler(event, context):
token = event['token']
# invoke transloadit via SSM, ECS, passing token along
然后在您的主要长期运行过程中,您将使用令牌发出回调,例如aws stepfunctions send-task-success --task-token $token
从 shell 脚本/CLI 或类似的 API 调用。
当您将请求发送到 transloadit 时,将 s3 中步骤的 taskToken 保存在基于上传文件密钥的可预测密钥中。例如,如果媒体文件位于“s3://my-media-bucket/foobar/media-001.mp3”,您可以制作一个包含当前步骤的任务令牌的 JSON 文件,并使用相同的密钥存储它在不同的存储桶中,例如“s3://ffprobe-tasks/foobar/media-001.mp3.json”。在将媒体发送到 transloadit 的步骤结束时,不要在该步骤上调用成功或失败 - 让它继续运行。
然后当你得到transloadit结果准备好的s3通知时,你可以确定s3键来获取任务令牌('s3://ffprobe-tasks/foobar/media-001.json'),加载JSON(并删除它来自 s3) 并发送该任务的成功。step 函数将继续执行到下一个状态。
您通常希望将异步任务作为 Step Function 活动启动。这里的关键字是启动- 换句话说,一旦你的活动有一个待处理的动作,那就是你触发你的异步动作的时候。这样做的原因是您需要与未决活动关联的任务令牌 - 然后只要您的“未来”可以以某种方式包含此令牌(例如,您可以将其设置为参考或请求 ID),那么您就可以“完成”使用SendTaskSuccess或SendTaskFailure调用成功或失败的活动。
启动任务有两种方法:
轮询一项新活动。您将设置 CloudWatch 计划事件以每 n 分钟调用一次GetActivityTask 。
在 step 函数中与您的活动并行触发一个新的“启动器”任务。这个发起者执行与#1 相同的调用,并调用GetActivityTask,唯一的区别是它立即触发并且不需要轮询机制。GetActivityTask 调用会阻塞,直到有新的活动任务可用,因此不存在竞争条件问题。请注意,您可能会从另一个执行中获取活动,因此该发起者只需要考虑活动的输入,而不是发起者本身接收的输入。
这是 Step Function 中 #2 的样子:
以及与InitiateManualApprovalActivity任务相关的基本代码示例:
import boto3
import time
client = boto3.client('stepfunctions')
activity = "arn:aws:states:us-east-1:123456789012:activity:ManualStep"
def lambda_handler(event, context):
print(event)
# This will block until an activity task becomes available
task = client.get_activity_task(activityArn=activity, workerName="test")
print(task)
# Perform your task here
# In this example we continue on in the same function,
# but the continuation could be a separate event,
# just as long as you can retrieve the task token
time.sleep(60)
response = client.send_task_success(taskToken=task['taskToken'], output=task['input'])
print(response)
return "done"
无法提出简单的解决方案,只能探索几个方向。
首先,Step Functions 有一种特定的方式来处理长时间运行的后台工作:活动。https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html它基本上是一个队列。
如果你想要 100% 无服务器,这将是复杂或丑陋的。
Retry
或者,使用自定义错误代码和子句在状态机中进行 S3 轮询循环如果您可以为后台工作人员分配“1/8 微”实例,它并不优雅但很容易,并且可以通过即时反应来实现。低硬件要求暗示我们将只使用机器进行同步。
定义 StepFunction 活动,例如命名为video-duration
。为即时反应定义 SQS 队列或为持续时间结果轮询 S3。
状态函数伪代码:
{
StartAt: ffprobe
ffprobe: {
Type: Task
Resource: arn:...lambda:launch-ffprobe
Next: wait-duration
}
wait-duration: {
Type: Task
Resource: arn...activity:video-duration
End: true
}
}
后台工作者伪代码:
statemap = dict/map filename to result
thread1:
loop:
taskToken, input = SF.GetActivityTask('video-duration') # long poll
sync(key=input.filename, waiter=taskToken)
thread2:
loop:
msg = SQS.ReceiveMessage(...) # or poll S3
sync(key=msg.filename, duration=msg.result)
function sync(key, waiter, duration):
state = statemap[key]
if waiter:
state.waiter = waiter
if duration:
state.duration = duration
if state.waiter and state.duration:
SF.SendTaskSuccess(state.waiter, state.duration)
S3触发伪代码:
if filename is video:
SF.StartExecution(...)
else if filename is duration:
content = S3.GetObject(filename)
SQS.SendMessage(queue, content)
当我尝试结合 SFN 来编排 AWS Batch 作业时,我也遇到了这个问题。上面建议的做法是有问题的,因为您应该传递 taskToken,因此您需要从状态机内的 lambda 轮询 TaskToken 从队列中,并将其传递给 S3 或其他地方,另一个 lambda 将提交活动状态.
问题是:当您轮询 taskToken 时,您无法知道它是否属于您的状态机实例。您可以改为在同一状态机的另一个实例上获取令牌。就个人而言,我认为如果 AWS 能够支持这个功能会很棒,他们很容易做到......
您可以将其中的 API Gateway 替换为 AWS Lambda 函数,例如由 S3 事件触发(文档:http ://docs.aws.amazon.com/lambda/latest/dg/with-s3.html )。只要确保您的任务有适当的超时。
如果您知道 transloadit 完成后会将文件放在 S3 中的哪个位置,您可以循环轮询 S3。要进行轮询,您可以使用HeadObject然后检查响应的状态代码。
AWS Step Function 文档中的一个示例项目描述了这种轮询循环。无需使用您必须为执行付费的 Lambda,您可以直接请求 S3 API,如此处所述。如果没有 Lambda,您只需为标准工作流中的状态转换付费。