0

我很难弄清楚我一直看到的原因:

`HibernateOptimisticLockingFailureException: FlowExecution: optimistic locking failed; nested exception is org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)`

我有一个使用 Quartz Scheduler 来触发作业的服务,在我的上下文中,这些作业被调用Flows,并且每个流可能由多个Tasks流和任务组成,Executables并且有关它们的实际信息Executions存储为FlowExecutionsTaskExecutions。该服务使用 aFlowService来启动流。

UPD:有一个 Quartz Job,“ExecutorJob”负责触发我的流程/任务。当它被触发时,它使用 FlowService 来启动它应该做的任何事情。所以我想知道石英线程是否有可能在每次使用服务时都没有创建新的休眠会话,这就是问题的原因。我没有改变 FlowService 的范围,所以它是一个单例,GORM 如何管理它使用的会话?

UPD2:尝试在 ExecutorJob 上使用 persistenceContextInterceptor 以确保每次使用该服务都使用一个新会话,但它没有解决问题。添加了 ExecutorJob 的简化代码。

我无法在本地重现该问题,但它在生产中经常发生,更具体地说,当需要启动大量流程时。我尝试过同步execute任务和流程的方法,但没有奏效,我现在尝试使用悲观锁,但我的猜测是它不会解决问题,因为检查应用程序日志似乎没有' t 两个线程更新同一行。下面我尝试展示一个简化版本的代码来模仿项目的结构。

// ------------------
// DOMAIN CLASSES
// ------------------
abstract class Executable {
    static hasMany = [flowTasks: FlowTask]
    static transients = ['executions']

    List<Execution> getExecutions() {
        this.id ? Execution.findAllByExecutable(this) : []
    }

    void addToExecutions(Execution execution) {
        execution.executable = this
        execution.save()
    }

    abstract List<Execution> execute(Map params)
}

class Flow extends Executable {
    SortedSet<FlowTask> tasks
    static hasMany = [tasks: FlowTask]

    private static final Object lockExecute = new Object()
    private static final Object lockExecuteTask = new Object()

    List<FlowExecution> execute(Map params) {
        synchronized (lockExecute) {
            List<Map> multiParams = multiplyParams(params)
            multiParams.collect { Map param ->
                FlowExecution flowExecution = new FlowExecution()
                addToExecutions(flowExecution)
                flowExecution.save()
                this.attach()
                save()
                executeTasks(firstTasks(param), flowExecution, param)
            }
        }
    }

    List<Map> multiplyParams(Map params) {
        // creates a list of params for the executions that must be started
        [params]
    }

    Set<FlowTask> firstTasks(Map params) {
        // finds the first tasks to be executed for the flow
        tasks.findAdll { true }
    }

    private FlowExecution executeTasks(Set<FlowTask> tasks, FlowExecution flowExecution, Map params) {
        synchronized (lockExecuteTask) {
            tasks.each { FlowTask flowTask ->
                try {
                    List<Execution> executions = flowTask.execute(params)
                    executions.each { Execution execution ->
                        flowExecution.addToExecutions(execution)
                    }
                    flowExecution.attach()
                } catch {
                    // log error executing task
                    throw e
                }            
            }

            this.attach()
            try {
                save(flush: true)
            } catch (HibernateOptimisticLockingFailureException e) {
                // log error saving flow
                throw e
            }

            flowExecution
        }
    }

}

class Task extends Executable {
    private static final Object lockExecute = new Object()
    private static final Object lockGetExecution = new Object()

    TaskExecution execute(TaskExecution execution) {
        taskService.start(execution)
        execution
    }

    List<TaskExecution> execute(Map params) {
        synchronized (lockExecute) {
            List<Map> multiExecParams = multiplyParams(params)
            multiExecParams.collect { Map param ->
                TaskExecution execution = getExecution(param)
                execute(execution)
            }
        }
    }

    TaskExecution getExecution(Map params) {
        synchronized (lockGetExecution) {
            TaskExecution execution = new TaskExecution(executable: this)
            execution.setParameters(params)
            addToExecutions(execution)

            execution.attach()
            execution.flowExecution?.attach()
            this.attach()
            try {
                save(flush: true)
            } catch (HibernateOptimisticLockingFailureException e) {
                // log error saving task
                throw e
            }

            execution
        }
    }

    List<Map> multiplyParams(Map params) {
        // creates a list of params for the tasks that must be started
        [params]
    }

}

class FlowTask {
    static belongsTo = [flow: Flow, executable: Executable]

    List<Execution> execute(Map params) {
        executable.execute(params)
    }
}

abstract class Execution {
    Map parameterData = [:]
    static belongsTo = [executable: Executable, flowExecution: FlowExecution]
    static transients = ['parameters', 'taskExecutions']   
    void setParameters(Map params) {
        params.each { key, value ->
            parameterData[key] = JsonParser.toJson(value)
        }
    }
}

class TaskExecution extends Execution {
}

class FlowExecution extends Execution {
    List<Execution> executions
    static transients = ['executions']

    FlowExecution() {
        executions = []
    }

    Set<TaskExecution> getTaskExecutions() {
        executions?.collect { Execution execution ->
            return execution.taskExecution
        }?.flatten()?.toSet()
    }

    void addToExecutions(Execution execution){
        executions.add(execution)
        execution.flowExecution = this
        execution.save()
    }

    def onLoad() {
        try {
            executions = this.id ? Execution.findAllByFlowExecution(this) : []
        } catch (Exception e){
            log.error(e)
            []
        }
    }
}

// -----------------
// SERVICE CLASSES
// -----------------
class FlowService {

    Map start(long flowId, Map params) {
        Flow flow = Flow.lock(flowId)

        startFlow(flow, params)
    }

    private Map startFlow(Flow flow, Map params) {
        List<RunningFlow> runningFlows = flow.execute(params) 

        [data: [success: true], status: HTTP_OK]
    }
}

//--------------------------------------
// Quartz job
//--------------------------------------
class ExecutorJob implements InterruptableJob {

    def grailsApplication = Holders.getGrailsApplication()

    static triggers = {}

    private Thread thread

    void execute(JobExecutionContext context) throws JobExecutionException {
        thread = Thread.currentThread()
        synchronized (LockContainer.taskLock) {
            Map params = context.mergedJobDataMap
            def persistenceInterceptor = persistenceInterceptorInstance

            try {
                persistenceInterceptor.init()

                Long executableId = params.executableId as Long

                def service = (Executable.get(executableId) instanceof Flow) ? flowServiceInstance : taskServiceInstance
                service.start(executableId, params)
            } catch (Exception e) {
                // log error
            } finally {
                persistenceInterceptor.flush()
                persistenceInterceptor.destroy()
            }
        }
    }

    PersistenceContextInterceptor getPersistenceInterceptorInstance() {
        grailsApplication.mainContext.getBean('persistenceInterceptor')
    }

    FluxoService getFlowServiceInstance() {
        grailsApplication.mainContext.getBean('flowService')
    }

    TarefaService getTaskServiceInstance() {
        grailsApplication.mainContext.getBean('taskService')
    }

    @Override
    void interrupt() throws UnableToInterruptJobException {
        thread?.interrupt()
    }    
}

任何人都知道可能有帮助的东西吗?

4

1 回答 1

0

嗯,很难理解出了什么问题。但是,我猜当您在会话中有一个对象已被其他事务保存或更新时,会引发此错误。同样,当休眠尝试保存此对象时,它会通过另一个事务错误给出 Row was updated错误。

我想您可以在保存对象之前尝试刷新,看看它是如何进行的。

http://grails.github.io/grails-doc/2.3.4/ref/Domain%20Classes/refresh.html

def b = Book.get(1)
…
b.refresh()
于 2015-09-25T18:39:57.733 回答