0

我用 Groovy 写了一个小线程管理器。管理器获取一个数组、一个要在数组上执行的任务以及一个块大小。然后在单独的线程中执行任务,并将结果添加到结果数组中。

这是类代码:

class ParallelManager {

  static def _threads = []
  static def _threadsFinishedCorrectly = []
  static  def _results = []

  static def runParallelTask( def dataArray, def taskFunc, int chunkSize ){
    assert chunkSize > 0
    assert taskFunc
    if (dataArray.size()==0) return

    assert dataArray.size() >= 0

    def subArray = partitionArray(dataArray, chunkSize)
    assert subArray.size() > 0

    subArray.each{ arrChunk->
        _threads.add( Thread.start{

            def chunkResults = taskFunc(arrChunk)
            assert chunkResults != null 
            _results.add(chunkResults) // EXCEPTION HERE
            _threadsFinishedCorrectly.add(true)
        })
    }

    // wait for all threads to finish
    _threads.each{ it.join() }
    log.info("Waiting for all threads to finish...")

    assert _threadsFinishedCorrectly.size() == _threads.size(),'some threads failed.'
    assert _results.size() == _threads.size() 
    log.info("${_threads.size()} finished.")

    return _results
}

/**
* Util function
* @param array
* @param size
* @return
*/
  static def partitionArray(array, size) {
   def partitions = []
   int partitionCount = array.size() / size

   partitionCount.times { partitionNumber ->
       def start = partitionNumber * size
       def end = start + size - 1
       partitions << array[start..end]
   }

   if (array.size() % size) partitions << array[partitionCount * size..-1]
   return partitions
}

经理可以这样称呼:

  def parallFunc = { array->
        log.info "I'm multiplying $array by 2"
        return array.collect{it*2}
    }

  def results = ParallelManager.runParallelTask( [1,2,3,4,5,6,7,8], parallFunc, 3)

此代码偶尔会在上面标记的行中引发此异常:

 Exception in thread "Thread-3" java.lang.ArrayIndexOutOfBoundsException: 1
 [java]     at java.util.ArrayList.add(ArrayList.java:352)
 [java]     at java_util_List$add.call(Unknown Source)

你有解决这个问题的方法吗?我认为像这样的一个小线程管理器对许多人在他们的代码中加速常见任务很有用。

干杯,穆龙

4

3 回答 3

4

这就是您使用 GPars 的方式:

@Grab( 'org.codehaus.gpars:gpars:0.12' )
import groovyx.gpars.*

def arr = [ 1, 2, 3, 4, 5, 6, 7, 8 ]

arr = GParsPool.withPool {
  arr.collectParallel { it * 2 }
}
于 2012-10-26T08:36:01.990 回答
1

您听说过gpars项目吗?它是一个经过验证的库,旨在使多核硬件的并发编程变得直观。它在收集处理方面非常强大。

我想建议你依赖这个库而不是实现你自己的简单线程管理器的有限版本。

于 2012-10-25T20:46:34.987 回答
1

我使用向量而不是数组列表解决了这个问题。下面的工作代码:

class ParallelManager {

static def log = Logger.getLogger(ParallelManager)

Vector _threads = []
Vector _threadsFinishedCorrectly = []
Vector _results = []

/**
 * 
 * @param dataArray
 * @param chunkSize
 * @param taskFunc
 * @return
 */
def runParallelTasks( def dataArray, int chunkSize, def taskFunc ){
    reset()
    assert chunkSize > 0
    assert taskFunc
    if (dataArray.size()==0) return

    assert dataArray.size() >= 0

    def subArray = partitionArray(dataArray, chunkSize)
    assert subArray.size() > 0

    subArray.each{ arrChunk->
        _threads.add( Thread.start{

            def chunkResults = taskFunc(arrChunk)
            assert chunkResults != null 
            _results.add(chunkResults)
            _threadsFinishedCorrectly.add(true)
        })
    }

    // wait for all threads to finish
    _threads.each{ it.join() }
    log.debug("Waiting for all threads to finish...")

    assert _threadsFinishedCorrectly.size() == _threads.size(),'some threads failed.'
    assert _results.size() == _threads.size() 
    log.debug("${_threads.size()} finished.")
    def res = _results.flatten()    
    //reset()
    assert dataArray.size() == res.size(),"Something went wrong. Some threads did not return their results. results=$res" 
    return res
}

void reset(){
    _threads = []
    _results = []
    _threadsFinishedCorrectly = []
}

/**
*
* @param array
* @param size
* @return
*/
def partitionArray(array, size) {
   def partitions = []
   int partitionCount = array.size() / size

   partitionCount.times { partitionNumber ->
       def start = partitionNumber * size
       def end = start + size - 1
       partitions << array[start..end]
   }

   if (array.size() % size) partitions << array[partitionCount * size..-1]
   return partitions
  }
}

经理可以这样称呼:

someClosure = {
    def resArray = doSomethingOn(it)
    return(resArray)
}
def resultArray = new ParallelManager().runParallelTasks( inputArray, 4, someClosure )
于 2012-10-26T08:36:47.197 回答