我有一个django-viewflow工作流,其中包括一个 3-way Split() 来处理电子邮件、SMS 等。由于这些活动中的每一个都可能需要很长时间才能完成,所以我将 3 个拆分分支中的每一个都表示为节点对:
- 产生 Celery 作业的普通 Handler() 节点。
- 一个自定义的等待芹菜节点。
自定义节点如下所示:
class CeleryEvent(mixins.TaskDescriptionViewMixin,
mixins.NextNodeMixin, mixins.DetailViewMixin,
mixins.UndoViewMixin,
mixins.CancelViewMixin, Event):
....
activation_class = derived-from-AbstractJobActivation
setting task_type = "somestring"
对 Celery 作业完成的 Viewflow 代码的调用遵循另一个问题的模型,并且特别包括其中包含的锁。通常,结果工作正常。但是,偶尔,我会从 Viewflow 1.3.0 的 join.py 中得到这个异常:
tasks = flow_class.task_class._default_manager.filter(
flow_task=flow_task,
process=process,
status=STATUS.STARTED)
if len(tasks) > 1:
raise FlowRuntimeError('More than one join instance for process found')
3个分支像这样加入:
close_join = flow.Join(wait_all=True). \
Next(this.alert_devops)
我对此感到有些困惑,因为在错误后的检查中,处于 STARTEDclose_join
状态的 process 和flow_task的组合确实发生了两次。我想知道我正在做的事情是否可能导致问题。据我所知,我的代码实际上都没有直接写入这个表。
我确实注意到 Task 表没有,我unique_together('process', 'flow_task')
认为这可能是因为 Viewflow 循环会导致同一个 flow_task 被多次命中。由于我的代码(还)没有循环,我想知道临时添加这样的约束是否是个好主意;至少那时非法国家的创造者会是失败点吗?
被占用的锁是否有可能跨进程不安全?由于 Celery 正在机器中的多个进程上运行这段代码,这可能会解释问题吗?
lock = self.flow_class.lock_impl(self.flow_class.instance)
with lock(self.flow_class, task.process_id):
#
# Re-acquire the task under a lock (see the StackOverflow thread).
#
task = self.flow_class.task_class._default_manager.get(pk=task.pk)
activation = self.activation_class()
activation.initialize(self, task)
activation.start()
activation.done()