我有以下 SWF 工作流,它异步调用 2 个活动,获取它们的结果,并将其传递给第三个活动:
a_future = Future.new.set
b_future = Future.new.set
a_future = activity.send_async(:a_activity, arg1)
b_future = activity.send_async(:b_activity, arg2)
wait_for_all(a_future, b_future)
a_url = a_future.get
b_url = b_future.get
activity.c_activity(arg1, a_url, b_url)
当按原样运行时,我在a_url
and中得到预期的返回值(在这种情况下是一个 URL) b_url
。
但是,当我向活动添加重试逻辑时
activity :a_activity, :b_activity, :c_activity do
{
version: "0.0.1",
default_task_list: $activity_task_list,
default_task_schedule_to_start_timeout: 30,
default_task_start_to_close_timeout: 30,
# ADD the next 3 lines for retry logic
exponential_retry: {
maximum_attempts: 5,
}
}
end
from 的值a_future.get
不是字符串 URL,而是:
#<AWS::Flow::Utilities::AddressableFuture:0x007fe87243d908>
我一直无法弄清楚如何从中获得结果AddressableFuture
。
我尝试编写一些可以使用和不使用重试逻辑的包装器代码:
def get_return_value(future)
value = future.get
if value.kind_of? AWS::Flow::Utilities::AddressableFuture
value = value.return_value.get
end
return value
end
接着:
a_url = get_return_value(a_future)
b_url = get_return_value(b_future)
...但这只是绕圈子,仍然没有得到结果。
当活动具有重试逻辑时,关于如何从两个活动中获取返回值并将其传递给第三个活动的任何想法?