亚马逊正在为未来的开发推广 boto3,但没有为新的 boto3 提供足够的文档。
有没有人愿意分享将 SWF 与 boto3 一起使用的示例代码?
亚马逊正在为未来的开发推广 boto3,但没有为新的 boto3 提供足够的文档。
有没有人愿意分享将 SWF 与 boto3 一起使用的示例代码?
这是迄今为止我发现的唯一示例:
https://github.com/jhludwig/aws-swf-boto3
所以流程概述看起来像这样(注意这是直接从上面的链接中提取的,但添加了一些额外的注释和更多的流程)。
需要注意的是,SWF 对事物的名称进行操作。为这些名称赋予执行意义取决于您的代码。例如,您Decider
将轮询并使用任务名称来决定下一步做什么。
有些事情我不太确定。TASKLIST
我相信引用是一种命名空间。它不是真正的事物列表,它更多的是按名称隔离事物。现在我可能完全错了,从我的基本理解来看,这就是我认为的意思。
你可以从任何地方运行你的决策者和工人。由于它们可以访问 AWS,因此如果您的防火墙允许 0.0.0.0/0 出口,您将可以访问。
AWS Docs 还提到您可以运行 lambda,但我还没有找到如何触发它。
import boto3
from botocore.exceptions import ClientError
swf = boto3.client('swf')
try:
swf.register_domain(
name=<DOMAIN>,
description="Test SWF domain",
workflowExecutionRetentionPeriodInDays="10" # keep history for this long
)
except ClientError as e:
print "Domain already exists: ", e.response.get("Error", {}).get("Code")
创建域后,我们现在注册工作流:
try:
swf.register_workflow_type(
domain=DOMAIN, # string
name=WORKFLOW, # string
version=VERSION, # string
description="Test workflow",
defaultExecutionStartToCloseTimeout="250",
defaultTaskStartToCloseTimeout="NONE",
defaultChildPolicy="TERMINATE",
defaultTaskList={"name": TASKLIST } # TASKLIST is a string
)
print "Test workflow created!"
except ClientError as e:
print "Workflow already exists: ", e.response.get("Error", {}).get("Code")
注册工作流程后,我们现在可以开始分配任务了。
您可以分配N个任务。请记住,这些主要是字符串,您的代码将赋予它们执行意义。
try:
swf.register_activity_type(
domain=DOMAIN,
name="DoSomething",
version=VERSION, # string
description="This is a worker that does something",
defaultTaskStartToCloseTimeout="NONE",
defaultTaskList={"name": TASKLIST } # TASKLIST is a string
)
print "Worker created!"
except ClientError as e:
print "Activity already exists: ", e.response.get("Error", {}).get("Code")
创建了域、工作流和任务后,我们现在可以开始工作流了。
import boto3
swf = boto3.client('swf')
response = swf.start_workflow_execution(
domain=DOMAIN # string,
workflowId='test-1001',
workflowType={
"name": WORKFLOW,# string
"version": VERSION # string
},
taskList={
'name': TASKLIST
},
input=''
)
print "Workflow requested: ", response
请注意workflowId
,这是一个自定义标识符,例如str(uuid.uuid4())
。从文档:
与工作流执行关联的用户定义标识符。您可以使用它来将自定义标识符与工作流执行相关联。如果工作流执行在逻辑上是先前执行的重新启动,您可以指定相同的标识符。您不能同时使用相同的 workflowId 进行两个打开的工作流执行。
在这一点上,什么都不会发生,因为我们没有Decider
running 也没有任何Workers
. 让我们看看那些是什么样子的。
我们的决策者将进行轮询以获取一项决策任务,以就以下事项做出决策:
import boto3
from botocore.client import Config
import uuid
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
请注意上面的超时设置。您可以参考此 PR 以了解其背后的基本原理:
https://github.com/boto/botocore/pull/634
来自 Boto3 SWF 文档:
工作人员应将其客户端套接字超时设置为至少 70 秒(比服务可能持有轮询请求的最长时间高 10 秒)。
该 PR 使 boto3 能够执行该功能。
http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task
print "Listening for Decision Tasks"
while True:
newTask = swf.poll_for_decision_task(
domain=DOMAIN ,
taskList={'name': TASKLIST }, # TASKLIST is a string
identity='decider-1', # any identity you would like to provide, it's recorded in the history
reverseOrder=False)
if 'taskToken' not in newTask:
print "Poll timed out, no new task. Repoll"
elif 'events' in newTask:
eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
lastEvent = eventHistory[-1]
if lastEvent['eventType'] == 'WorkflowExecutionStarted':
print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'ScheduleActivityTask',
'scheduleActivityTaskDecisionAttributes': {
'activityType':{
'name': TASKNAME, # string
'version': VERSION # string
},
'activityId': 'activityid-' + str(uuid.uuid4()),
'input': '',
'scheduleToCloseTimeout': 'NONE',
'scheduleToStartTimeout': 'NONE',
'startToCloseTimeout': 'NONE',
'heartbeatTimeout': 'NONE',
'taskList': {'name': TASKLIST}, # TASKLIST is a string
}
}
]
)
print "Task Dispatched:", newTask['taskToken']
elif lastEvent['eventType'] == 'ActivityTaskCompleted':
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'CompleteWorkflowExecution',
'completeWorkflowExecutionDecisionAttributes': {
'result': 'success'
}
}
]
)
print "Task Completed!"
请注意,在此代码段的末尾,我们会检查是否有ActivityTaskCompleted
,并以决定CompleteWorkflowExecution
让 SWF 知道我们已经完成作为回应。
那是决定者,工人是什么样的?
http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task
再次注意,我们设置read_timeout
import boto3
from botocore.client import Config
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
现在我们开始我们的工人轮询:
print "Listening for Worker Tasks"
while True:
task = swf.poll_for_activity_task(
domain=DOMAIN,# string
taskList={'name': TASKLIST}, # TASKLIST is a string
identity='worker-1') # identity is for our history
if 'taskToken' not in task:
print "Poll timed out, no new task. Repoll"
else:
print "New task arrived"
swf.respond_activity_task_completed(
taskToken=task['taskToken'],
result='success'
)
print "Task Done"
我们再次向 SWF 发出信号,表明我们已经完成了我们的工作。
官方文档的链接是[这里][1]。
有很多代码示例,只需点击链接或 [this][2] 之一即可。在可用服务部分下,它列出了 boto3 现在支持的所有服务以及详细示例。
一些示例是:boto3 和获取 SWF 的执行计数
import boto3
import datetime
import time
import dateutil.tz
def lambda_handler(event,context):
swfClient = boto3.client('swf')
currentTimeZone = dateutil.tz.gettz('Australia/Brisbane')
latestDate = datetime.datetime.now(tz=currentTimeZone)
oldestDate = latestDate - datetime.timedelta(1)
fullTextPreloadResponse = swfClient.count_open_workflow_executions(
domain=domainName,
startTimeFilter={
'oldestDate': oldestDate,
'latestDate': latestDate
},
typeFilter={
'name': 'NAME_OF_YOUR_SWF_WORKFLOW_NAME',
'version': 'VERSION_NUMBER'
}
)
print("the count is " + str(fullTextResponse['count']))
print(fullTextResponse)
这是我在我的案例中用来获取正在运行的 SWF 工作流类型的计数的方法。我使用的格式在上面提到的文档中有很好的定义。
要简单地结合使用 boto3 和 SWF,首先要在 python lambda 函数中导入 boto3。然后正在添加 python DateTime。然后一个 boto3.client 设置我们可以使用的客户端 | 与 SWF 交互。
其他例子是:
history = swf.get_workflow_execution_history(
domain= domainName,
execution={
'workflowId': workflowId,
'runId': runId
},
)
希望这个对你有帮助![1]:https ://boto3.amazonaws.com/v1/documentation/api/latest/index.html [2]:https ://boto3.amazonaws.com/v1/documentation/api/latest/reference/services /index.html