16

概要

我正在开发一个 Web 应用程序来学习 Django(python 3.4 和 Django 1.6.10)。Web 应用程序具有复杂且经常更新的工作流程。我决定集成 Django-Viewflow 库(https://github.com/viewflow/viewflow/),因为它似乎是一种处理工作流的非常方便的方法,并且不会将工作流逻辑与应用程序模型结合起来。

在这种情况下,我创建了一个工作流来使用 Django-Viewflow 库收集作者信息和版权。每次将作者添加到书籍时,都应启动工作流。

我的问题

该文档提供了集成端到端工作流解决方案(前端和后端)的分步指南。我的问题是我很难以编程方式控制工作流程(特别是来自 Book 模型)。

申请说明

我有一个与作者有多对多关系的 Book 模型(核心模型)。

myApp/models.py

class Book(models.Model):
    title = models.CharField(max_length=100)
    authors = models.ManyToManyField(Author)
    publisher = models.ForeignKey(Publisher)
    publication_date = models.DateField()

工作流组件是:

myFlow/models.py

from viewflow.models import Process

class AuthorInvitation(process)     
    consent_confirmed = models.BooleanField(default=False)
    signature = models.CharField(max_length=150) 

myFlow/flows.py

from viewflow import flow
from viewflow.base import this, Flow
from viewflow.contrib import celery
from viewflow.views import StartProcessView, ProcessView
from . import models, tasks

class AuthorInvitationFlow(Flow):
    process_cls = models.AuthorInvitation

    start = flow.Start(StartProcessView) \
        .Permission(auto_create=True) \
        .Next(this.notify)

    notify = celery.Job(tasks.send_authorship_request) \
        .Next(this.approve)

    approve = flow.View(ProcessView, fields=["confirmed","signature"]) \
        .Permission(auto_create=True) \
        .Next(this.check_approve)

    check_approve = flow.If(cond=lambda p: p.confirmed) \
        .OnTrue(this.send) \
        .OnFalse(this.end)

    send = celery.Job(tasks.send_authorship) \
        .Next(this.end)

    end = flow.End()

问题

如何以编程方式控制工作流程(激活、确认步骤、重做步骤、取消流程……)?我试图深入研究图书馆的代码。似乎class activate包含正确的方法,但不确定应该如何编排整体。

提前致谢!

4

3 回答 3

12

有两个额外的 Start 内置任务可用于 Flows

StartFunction - 当函数在某处调用时启动流程:

@flow_start_func
def create_flow(activation, **kwargs):
    activation.prepare()
    activation.done()
    return activation

class FunctionFlow(Flow):
    start = flow.StartFunction(create_flow) \
        .Next(this.end)

# somewhere in the code
FunctionFlow.start.run(**some_kwargs)

StartSignal - 在 django 信号接收时启动流程:

class SignalFlow(Flow):
    start = flow.StartSignal(some_signal, create_flow) \      
        .Next(this.end)

您可以在此视图流测试套件中检查它们的使用情况以及其他内置任务。

对于手动处理任务状态,首先应该从数据库中获取任务,激活它,然后调用任何激活方法。

task  = MyFlow.task_cls.objects.get(...)
activation = task.activate()
if  activation.undo.can_proceed():
    activation.undo()

任何激活转换都有.can_proceed()方法,帮助您检查,是处于允许转换状态的任务。

于 2015-02-26T07:47:11.953 回答
1

我需要能够手动或以编程方式启动流实例。根据上述参考,我最终得到的模型StartFunction如下所示:

class MyRunFlow(flow.Flow):
    process_class = Run

    start = flow.Start(ProcessCreate, fields=['schedule']). \
        Permission(auto_create=True). \
        Next(this.wait_data_collect_start)
    start2 = flow.StartFunction(process_create). \
        Next(this.wait_data_collect_start)

请注意,重要的一点是process_create具有Process对象,并且此代码必须以编程方式设置手动表单提交通过字段规范执行的相同字段ProcessCreate

@flow_start_func
def process_create(activation: FuncActivation, **kwargs):
    #
    # Update the database record.
    #
    db_sch = Schedule.objects.get(id=kwargs['schedule'])
    activation.process.schedule = db_sch # <<<< Same fields as ProcessCreate
    activation.process.save()
    #
    # Go!
    #
    activation.prepare()
    activation.done()
    return activation

请注意,is中的activation子类具有 prepare() 和 save() 方法。来自运行调用,类似于:flow_start_funcFuncActivationkwargs

start_node = <walk the flow class looking for nodes of type StartFunction>
activation = start_node.run(schedule=self.id)
于 2018-04-30T16:57:07.120 回答
0

手动启动:

from viewflow import flow
from viewflow.base import this, Flow
from .models import CIE
from viewflow import frontend
from django.utils.decorators import method_decorator

@frontend.register
class CIE(Flow):
    process_class = CIE
    start = flow.StartFunction(this.create_flow).Next(this.end)
    end = flow.End()

    @method_decorator(flow.flow_start_func)
    def create_flow(self, activation):
        activation.prepare()
        activation.done()
        return activation

之后公开 rpc:

from modernrpc.core import rpc_method
from flow.flows import CIE

    @rpc_method
    def start():
        CIE.start.run()
于 2019-05-31T14:56:36.210 回答