并行运行 a 的问题map
是您可能会丢失列表的顺序。这是一个forEach
将并行执行的循环示例
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
private fun getExecutor(threads: Int) = Executors.newFixedThreadPool(threads)
private fun ExecutorService.run(runnables: List<() -> Unit>) {
val jobs = runnables.map {
submit(it)
}
jobs.forEach {
it.get()
}
}
private val _executorMap = mutableMapOf<Int, ExecutorService>()
fun <T> Iterable<T>.forEachParallel(threads: Int = Runtime.getRuntime().availableProcessors(), block: (T) -> Unit) {
val executor = _executorMap.getOrPut(threads) { getExecutor(threads) }
val list: List<() -> Unit> = (0 until size).map { i ->
{ block(get(i)) }
}
executor.run(list)
}
或者,您可以使用 RxJava
private val _executorMap = mutableMapOf<Int, Scheduler>()
private fun getScheduler(threads: Int) =
_executorMap.getOrPut(threads) { Schedulers.from(Executors.newFixedThreadPool(threads)) }
fun <T> Iterable<T>.forEachParallel(threads: Int = Runtime.getRuntime().availableProcessors(), block: (T) -> Unit) {
Flowable.fromIterable(this)
.parallel(threads)
.runOn(getScheduler(threads))
.map(block)
.sequential()
.blockingSubscribe()
如果您确实需要订单,您可以轻松修改这 2 个函数以跟踪索引。你可以像这样使用它:
bigList.forEachParallel {
val result = someComputation(it)
newList.add(result)
}