1

我正在努力为我的应用程序问题找到解决方案。
我的应用程序正在生成包含一百万个点的图表,为此我正在保存一个字符串,其中所有点都由房间数据库上的空格分隔。

话虽这么说,我正在加载这个点字符串,使用 mapIndex 将其拆分并在所有点上放置索引,如下所示:

val map = mutableMapOf<Float, Float>()
            signal.split(" ").mapIndexed { index, signal ->
                signal.toFloatOrNull()?.let { floatSignal ->
                    map[index.toFloat()] = floatSignal
                }
            }

使用此代码,我可以在图表上显示所有点,但是在处理一百万个点(超过三秒)时,此操作 mapIndexed 太慢了
我的问题是,有一种方法可以在多线程上拆分 mapIndexed,然后加入结果?对不起,如果有点难以理解,我不是母语人士

4

2 回答 2

4

您可以使用chunked和 协程的组合:

  • chunked将列表拆分为多个部分
  • async启动一个新的协程
  • awaitAll等待协程完成
  • flatten将块组合Pairs到一个列表中
  • toMap将对列表转换为新的Map

请注意,这runBlocking仅用作创建协程范围的示例。

runBlocking(Dispatchers.Default) {
    val chunkSize = 10000
    val map = signal.split(" ").chunked(chunkSize).mapIndexed { chunkIndex, chunk ->
        async {
            chunk.mapIndexedNotNull { index, signal ->
                signal.toFloatOrNull()?.let { floatSignal ->
                    (chunkIndex * chunkSize + index).toFloat() to floatSignal
                }
            }
        }
    }.awaitAll().flatten().toMap()
}
于 2019-03-12T06:41:36.870 回答
1

并行运行 a 的问题map是您可能会丢失列表的顺序。这是一个forEach将并行执行的循环示例

import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors


private fun getExecutor(threads: Int) = Executors.newFixedThreadPool(threads)
private fun ExecutorService.run(runnables: List<() -> Unit>) {
    val jobs = runnables.map {
        submit(it)
    }
    jobs.forEach {
        it.get()
    }
}

private val _executorMap = mutableMapOf<Int, ExecutorService>()

fun <T> Iterable<T>.forEachParallel(threads: Int = Runtime.getRuntime().availableProcessors(), block: (T) -> Unit) {

    val executor = _executorMap.getOrPut(threads) { getExecutor(threads) }

    val list: List<() -> Unit> = (0 until size).map { i ->
        { block(get(i)) }
    }

    executor.run(list)
}

或者,您可以使用 RxJava

private val _executorMap = mutableMapOf<Int, Scheduler>()
private fun getScheduler(threads: Int) =
    _executorMap.getOrPut(threads) { Schedulers.from(Executors.newFixedThreadPool(threads)) }

fun <T> Iterable<T>.forEachParallel(threads: Int = Runtime.getRuntime().availableProcessors(), block: (T) -> Unit) {

    Flowable.fromIterable(this)
        .parallel(threads)
        .runOn(getScheduler(threads))
        .map(block)
        .sequential()
        .blockingSubscribe()

如果您确实需要订单,您可以轻松修改这 2 个函数以跟踪索引。你可以像这样使用它:

 bigList.forEachParallel { 
        val result = someComputation(it)
        newList.add(result)
    }

于 2019-03-12T03:26:39.913 回答