1

好的,gpars 是全新的,所以如果这有一个明显的答案,请原谅我。

这是我的场景。我们目前有一段代码包装在 Thread.start {} 块中。它这样做是为了可以在后台将消息发送到消息队列而不阻塞用户请求。我们最近遇到的一个问题是对于大型工作块,用户可能会执行另一个操作,这会导致该块再次执行。由于它是线程化的,因此第二批消息可能会在第一批消息之前发送,从而导致数据损坏。

我想将此过程更改为与 gpars 一起作为队列流工作。我见过创建池的示例,例如

def pool = GParsPool.createPool()

或者

def pool = new ForkJoinPool()

然后将池用作

GParsPool.withExistingPool(pool) {
    ...
}

这似乎可以解释如果用户再次执行操作,我可以重用创建的池并且操作不会乱序执行,前提是我的池大小为 1。

我的问题是,这是使用 gpar 执行此操作的最佳方法吗?此外,我如何知道池何时完成所有工作?当所有工作完成时它会终止吗?如果是这样,是否有一种方法可用于检查池是否已完成/终止以知道我需要一个新池?

任何帮助,将不胜感激。

4

2 回答 2

0

不,显式创建的池不会自行终止。您必须明确调用它们的 shutdown()。

但是,使用 withPool() {} 命令将保证在代码块完成后销毁池。

于 2015-05-12T06:15:30.803 回答
0

这是我们对问题的当前解决方案。应该注意的是,由于我们的要求,我们遵循了这条路线

  • 工作按某些上下文分组
  • 给定上下文中的工作是有序的
  • 给定上下文中的工作是同步的
  • 上下文的附加工作应该在前面的工作之后执行
  • 工作不应阻塞用户请求
  • 上下文之间是异步的
  • 完成上下文的工作后,上下文应自行清理

鉴于上述情况,我们实现了以下内容:

class AsyncService {
    def queueContexts


    def AsyncService() {
        queueContexts = new QueueContexts()
    }

    def queue(contextString, closure) {
        queueContexts.retrieveContextWithWork(contextString, true).send(closure)
    }

    class QueueContexts {
        def contextMap = [:]

        def synchronized retrieveContextWithWork(contextString, incrementWork) {
            def context = contextMap[contextString]

            if (context) {
                if (!context.hasWork(incrementWork)) {
                    contextMap.remove(contextString)
                    context.terminate()
                }
            } else {
                def queueContexts = this
                contextMap[contextString] = new QueueContext({->
                    queueContexts.retrieveContextWithWork(contextString, false)
                })
            }

            contextMap[contextString]
        }

        class QueueContext {
            def workCount
            def actor

            def QueueContext(finishClosure) {
                workCount = 1
                actor = Actors.actor {
                    loop {
                        react { closure ->
                            try {
                                closure()
                            } catch (Throwable th) {
                                log.error("Uncaught exception in async queue context", th)
                            }

                            finishClosure()
                        }
                    }
                }
            }

            def send(closure) {
                actor.send(closure)
            }

            def terminate(){
                actor.terminate()
            }

            def hasWork(incrementWork) {
                workCount += (incrementWork ? 1 : -1)
                workCount > 0
            }
        }
    }
}
于 2015-05-14T13:38:10.270 回答