3

我一直在使用亚马逊 swf 的流程框架,我希望能够运行优先工作流执行和正常工作流执行。如果有优先任务,那么活动应该在正常优先任务之前获取优先任务。实现这一目标的最佳方法是什么?

我认为以下方法可能有效,但我想知道是否有更好/推荐的方法。

  1. 我将为该活动定义两个活动工作者和两个活动列表。一份优先清单和一份普通清单。每个工人将使用相同的活动类。
  2. 两个工作人员将在同一主机(ec2 实例)上运行。
  3. 在工作流上,我将定义两个方法:startNormalWorkflow 和 startHighWorkflow。在 startHighWorkflow 方法中,我可以使用 ActivitySchedulingOptions 将任务放在高优先级列表中。

这种方法的问题是不能保证高优先级任务在正常任务之前被调度。

4

2 回答 2

2

这是一个很好的问题,它让我摸不着头脑。

当然,给这只猫剥皮的方法不止一种,并且存在许多有效的解决方案。我在这里专注于我能想到的最简单的方法,即在单个工作流中按优先级顺序执行任务。

场景如下:我定义了一个服务于两个任务列表的活动工作者,default_tasks并且urgent_tasks,使用一个简单的逻辑:

  1. 如果urgent_tasks列表中有待处理的任务,则从那里选择一个,
  2. 否则,从default_tasks
  3. 执行任何选定的任务。

问题是如何检查是否有任何高优先级任务未决?CountPendingActivityTasks API 来救援!

我知道您使用Flow进行开发。我的示例是使用boto.swf.layer2Python 编写的,因为它更容易进行原型设计 - 但这个想法保持不变,并且可以扩展到具有高优先级和低优先级工作流执行的更复杂的场景。

因此,要使用boto.swf完成上述操作,请执行以下步骤:

将凭据导出到环境

$ export AWS_ACCESS_KEY_ID=your access key
$ export AWS_SECRET_ACCESS_KEY= your secret key 

获取代码片段

为方便起见,您可以从 github 分叉它:

$ git clone git@github.com:oozie/stackoverflow.git
$ cd stackoverflow/amazon-swf/priority_tasks/

引导域和工作流:

# domain_setup.py 
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
VERSION = '1.0'

swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name='SomeActivity', version=VERSION, task_list='default_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()

决定实施:

# decider.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY = 'SomeActivity'
VERSION = '1.0'

class MyWorkflowDecider(swf.Decider):

    domain = DOMAIN
    task_list = 'default_tasks'
    version = VERSION

    def run(self):
        history = self.poll()
        print history
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]

            decisions = swf.Layer1Decisions()

            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']

            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task(ACTIVITY+'1', ACTIVITY, VERSION, task_list='default_tasks')
                decisions.schedule_activity_task(ACTIVITY+'2', ACTIVITY, VERSION, task_list='urgent_tasks')
                decisions.schedule_activity_task(ACTIVITY+'3', ACTIVITY, VERSION, task_list='default_tasks')
                decisions.schedule_activity_task(ACTIVITY+'4', ACTIVITY, VERSION, task_list='urgent_tasks')
                decisions.schedule_activity_task(ACTIVITY+'5', ACTIVITY, VERSION, task_list='default_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Complete workflow execution after 5 completed activities.
                closed_activity_count = sum(1 for wf_event in workflow_events if wf_event.get('eventType') == 'ActivityTaskCompleted')
                if closed_activity_count == 5:
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True

优先考虑工作人员的实施:

# worker.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
VERSION = '1.0'

class PrioritizingWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION

    def run(self):

        urgent_task_count = swf.Domain(name=DOMAIN).count_pending_activity_tasks('urgent_tasks').get('count', 0)
        if urgent_task_count > 0:
            self.task_list = 'urgent_tasks'
        else:
            self.task_list = 'default_tasks'
        activity_task = self.poll()

        if 'activityId' in activity_task:
            print urgent_task_count, 'urgent tasks in the queue. Executing ' + activity_task.get('activityId')
            self.complete()
            return True

从交互式 Python shell 的三个实例运行工作流

运行决策者:

$ python -i decider.py
>>> while MyWorkflowDecider().run(): pass
... 

开始执行:

$ python -i decider.py 
>>> swf.WorkflowType(domain='stackoverflow', name='MyWorkflow', version='1.0', task_list='default_tasks').start()

最后,启动 worker 并观察执行的任务:

$ python -i worker.py 
>>> while PrioritizingWorker().run(): pass
... 
2 urgent tasks in the queue. Executing SomeActivity2
1 urgent tasks in the queue. Executing SomeActivity4
0 urgent tasks in the queue. Executing SomeActivity5
0 urgent tasks in the queue. Executing SomeActivity1
0 urgent tasks in the queue. Executing SomeActivity3
于 2013-09-13T22:27:31.093 回答
1

事实证明,使用必须首先检查的单独任务列表效果不佳。

有几个问题。

首先,count API 不能可靠地更新。因此,即使队列中有紧急任务,您也可能获得 0 个任务。

其次,如果没有可用的任务,轮询任务的调用会挂起。因此,当您轮询非紧急任务时,它将“坚持”2 分钟,或者直到您有非紧急任务要做。

因此,这可能会导致您的工作流程中出现各种问题。

为此,SWF 必须实现一个轮询 API,该 API 可以从任务列表的列表中返回第一个任务。那么它会容易得多。

于 2014-11-10T20:41:10.610 回答