4

我有一组对象,我需要对其执行一些转换。目前我正在使用:

var myObjects: List<MyObject> = getMyObjects()

myObjects.forEach{ myObj ->
    someMethod(myObj)
}

它工作正常,但我希望通过并行运行来加速它someMethod(),而不是等待每个对象完成,然后再开始下一个对象。

在 Kotlin 中有没有办法做到这一点?也许与doAsyncTask或什么?

我知道一年多前有人问这个问题时这是不可能的,但是现在 Kotlin 有协程,doAsyncTask我很好奇协程是否可以提供帮助

4

4 回答 4

17

是的,这可以使用协程来完成。以下函数对集合的所有元素并行应用操作:

fun <A>Collection<A>.forEachParallel(f: suspend (A) -> Unit): Unit = runBlocking {
    map { async(CommonPool) { f(it) } }.forEach { it.await() }
}

虽然定义本身有点神秘,但您可以按照您的预期轻松应用它:

myObjects.forEachParallel { myObj ->
    someMethod(myObj)
}

并行映射可以以类似的方式实现,请参见https://stackoverflow.com/a/45794062/1104870

于 2017-08-21T10:07:33.197 回答
5

Java Stream 在 Kotlin 中很容易使用:

tasks.stream().parallel().forEach { computeNotSuspend(it) }

但是,如果您使用的是 Android,如果您想要与低于 24 的 API 兼容的应用程序,则不能使用 Java 8。

您也可以按照您的建议使用协程。但截至目前(2017 年 8 月),它还不是语言的一部分,您需要安装一个外部库。有很好的示例指南

    runBlocking<Unit> {
        val deferreds = tasks.map { async(CommonPool) { compute(it) } }
        deferreds.forEach { it.await() }
    }

请注意,协程是通过非阻塞多线程实现的,这意味着它们可以比传统的多线程更快。我有下面的代码对 Stream 并行与协程进行基准测试,在这种情况下,协程方法在我的机器上要快 7 倍。 但是,您必须自己做一些工作以确保您的代码“暂停”(非锁定),这可能非常棘手。 在我的示例中,我只是调用了库提供delaysuspend函数。非阻塞多线程并不总是比传统多线程快。如果您有许多线程除了等待 IO 之外什么都不做,它会更快,这就是我的基准测试正在做的事情。

我的基准测试代码:

import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import java.util.*
import kotlin.system.measureNanoTime
import kotlin.system.measureTimeMillis

class SomeTask() {
    val durationMS = random.nextInt(1000).toLong()

    companion object {
        val random = Random()
    }
}

suspend fun compute(task: SomeTask): Unit {
    delay(task.durationMS)
    //println("done ${task.durationMS}")
    return
}

fun computeNotSuspend(task: SomeTask): Unit {
    Thread.sleep(task.durationMS)
    //println("done ${task.durationMS}")
    return
}

fun main(args: Array<String>) {
    val n = 100
    val tasks = List(n) { SomeTask() }

    val timeCoroutine = measureNanoTime {
        runBlocking<Unit> {
            val deferreds = tasks.map { async(CommonPool) { compute(it) } }
            deferreds.forEach { it.await() }
        }
    }

    println("Coroutine ${timeCoroutine / 1_000_000} ms")

    val timePar = measureNanoTime {
        tasks.stream().parallel().forEach { computeNotSuspend(it) }
    }
    println("Stream parallel ${timePar / 1_000_000} ms")
}

我的 4 核计算机上的输出:

Coroutine: 1037 ms
Stream parallel: 7150 ms

如果您取消注释掉println这两个compute函数中的注释,您将看到在非阻塞协同程序代码中,任务以正确的顺序处理,但不是使用 Streams。

于 2017-08-17T21:45:56.170 回答
0

你可以使用RxJava来解决这个问题。

List<MyObjects> items = getList()

Observable.from(items).flatMap(object : Func1<MyObjects, Observable<String>>() {
    fun call(item: MyObjects): Observable<String> {
        return someMethod(item)
    }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : Subscriber<String>() {
    fun onCompleted() {

    }

    fun onError(e: Throwable) {

    }

    fun onNext(s: String) {
        // do on output of each string
    }
})

通过订阅 on Schedulers.io(),一些方法被安排在后台线程上。

于 2017-08-08T18:47:14.523 回答
0

要并行处理集合中的项目,您可以使用Kotlin Coroutines。例如,以下扩展函数并行处理项目并等待它们被处理:

suspend fun <T, R> Iterable<T>.processInParallel(
    dispatcher: CoroutineDispatcher = Dispatchers.IO,
    processBlock: suspend (v: T) -> R,
    ): List<R> = coroutineScope { // or supervisorScope
          map {
              async(dispatcher) { processBlock(it) }
          }.awaitAll()
    }

这是类型的suspend扩展函数Iterable<T>,它对项目进行并行处理并返回处理每个项目的一些结果。默认情况下,它使用Dispatchers.IO调度程序将阻塞任务卸载到共享线程池。必须从协程(包括带有Dispatchers.Main调度程序的协程)或其他suspend函数中调用。

从协程调用的示例:

val myObjects: List<MyObject> = getMyObjects()

someCoroutineScope.launch {
    val results = myObjects.processInParallel {
        someMethod(it)
    }
    // use processing results
}

someCoroutineScope的实例在哪里CoroutineScope


或者,如果您只想启动并忘记,您可以使用此功能:

fun <T> CoroutineScope.processInParallelAndForget(
    iterable: Iterable<T>,
    dispatcher: CoroutineDispatcher = Dispatchers.IO,
    processBlock: suspend (v: T) -> Unit
) = iterable.forEach {
    launch(dispatcher) { processBlock(it) }
}

这是 上的扩展函数CoroutineScope,不返回任何结果。它还Dispatchers.IO默认使用调度程序。可以使用CoroutineScope或从另一个协程调用。调用示例:

someoroutineScope.processInParallelAndForget(myObjects) {
    someMethod(it)
}

// OR from another coroutine:

someCoroutineScope.launch {
    processInParallelAndForget(myObjects) {
        someMethod(it)
    }
}

someCoroutineScope的实例在哪里CoroutineScope

于 2022-01-25T09:55:52.483 回答