我用 Groovy 写了一个小线程管理器。管理器获取一个数组、一个要在数组上执行的任务以及一个块大小。然后在单独的线程中执行任务,并将结果添加到结果数组中。
这是类代码:
class ParallelManager {
static def _threads = []
static def _threadsFinishedCorrectly = []
static def _results = []
static def runParallelTask( def dataArray, def taskFunc, int chunkSize ){
assert chunkSize > 0
assert taskFunc
if (dataArray.size()==0) return
assert dataArray.size() >= 0
def subArray = partitionArray(dataArray, chunkSize)
assert subArray.size() > 0
subArray.each{ arrChunk->
_threads.add( Thread.start{
def chunkResults = taskFunc(arrChunk)
assert chunkResults != null
_results.add(chunkResults) // EXCEPTION HERE
_threadsFinishedCorrectly.add(true)
})
}
// wait for all threads to finish
_threads.each{ it.join() }
log.info("Waiting for all threads to finish...")
assert _threadsFinishedCorrectly.size() == _threads.size(),'some threads failed.'
assert _results.size() == _threads.size()
log.info("${_threads.size()} finished.")
return _results
}
/**
* Util function
* @param array
* @param size
* @return
*/
static def partitionArray(array, size) {
def partitions = []
int partitionCount = array.size() / size
partitionCount.times { partitionNumber ->
def start = partitionNumber * size
def end = start + size - 1
partitions << array[start..end]
}
if (array.size() % size) partitions << array[partitionCount * size..-1]
return partitions
}
经理可以这样称呼:
def parallFunc = { array->
log.info "I'm multiplying $array by 2"
return array.collect{it*2}
}
def results = ParallelManager.runParallelTask( [1,2,3,4,5,6,7,8], parallFunc, 3)
此代码偶尔会在上面标记的行中引发此异常:
Exception in thread "Thread-3" java.lang.ArrayIndexOutOfBoundsException: 1
[java] at java.util.ArrayList.add(ArrayList.java:352)
[java] at java_util_List$add.call(Unknown Source)
你有解决这个问题的方法吗?我认为像这样的一个小线程管理器对许多人在他们的代码中加速常见任务很有用。
干杯,穆龙