0

我有一个django-viewflow工作流,其中包括一个 3-way Split() 来处理电子邮件、SMS 等。由于这些活动中的每一个都可能需要很长时间才能完成,所以我将 3 个拆分分支中的每一个都表示为节点对:

  • 产生 Celery 作业的普通 Handler() 节点。
  • 一个自定义的等待芹菜节点。

自定义节点如下所示:

class CeleryEvent(mixins.TaskDescriptionViewMixin,
                  mixins.NextNodeMixin, mixins.DetailViewMixin, 
                  mixins.UndoViewMixin,
                  mixins.CancelViewMixin, Event):
    ....
    activation_class = derived-from-AbstractJobActivation
    setting task_type = "somestring"

对 Celery 作业完成的 Viewflow 代码的调用遵循另一个问题的模型,并且特别包括其中包含的锁。通常,结果工作正常。但是,偶尔,我会从 Viewflow 1.3.0 的 join.py 中得到这个异常:

tasks = flow_class.task_class._default_manager.filter(
        flow_task=flow_task,
        process=process,
        status=STATUS.STARTED)

if len(tasks) > 1:
    raise FlowRuntimeError('More than one join instance for process found')

3个分支像这样加入:

close_join = flow.Join(wait_all=True). \
    Next(this.alert_devops)

我对此感到有些困惑,因为在错误后的检查中,处于 STARTEDclose_join状态的 process 和flow_task的组合确实发生了两次。我想知道我正在做的事情是否可能导致问题。据我所知,我的代码实际上都没有直接写入这个表。

我确实注意到 Task 表没有unique_together('process', 'flow_task')认为可能是因为 Viewflow 循环会导致同一个 flow_task 被多次命中。由于我的代码(还)没有循环,我想知道临时添加这样的约束是否是个好主意;至少那时非法国家的创造者会是失败点吗?

被占用的锁是否有可能跨进程不安全?由于 Celery 正在机器中的多个进程上运行这段代码,这可能会解释问题吗?

    lock = self.flow_class.lock_impl(self.flow_class.instance)
    with lock(self.flow_class, task.process_id):
        #
        # Re-acquire the task under a lock (see the StackOverflow thread).
        #
        task = self.flow_class.task_class._default_manager.get(pk=task.pk)
        activation = self.activation_class()
        activation.initialize(self, task)
        activation.start()
        activation.done()
4

1 回答 1

0

我相信缺乏锁定确实是问题所在。根据文档

默认情况下不启用锁定。您需要选择正确的锁实现并启用它。

于 2018-09-19T11:53:18.630 回答