5

我有一种情况,我需要观察用户 ID,然后使用这些用户 ID 来观察用户。userIds 或 users 可以随时更改,我希望让发出的用户保持最新。这是我拥有的数据来源的示例:


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}

在这种情况下,我希望排放量为:

[User(abc_name), User(def_name)], 然后

[User(123_name), User(234_name)], 然后

[User(123_name_updated), User(234_name_updated)]

我想我可以像这样在 RxJava 中实现这一点:

observeBestUserIds.concatMapSingle { ids ->
    Observable.fromIterable(ids)
        .concatMap { id ->
            observeUserForId(id)
        }
        .toList()
}

我会写什么函数来制作一个发出它的流?

4

3 回答 3

4

我相信您正在寻找combine,它为您提供了一个可以轻松调用的数组toList()

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.collect {
        println(it)
    } 
}

这是具有更明确参数名称的内部部分,因为您在 Stack Overflow 上看不到 IDE 的类型提示:

combine(
    ids.map { id -> observeUserForId(id) }
) { arrayOfUsers: Array<User> ->
    arrayOfUsers.toList()
}.collect { listOfUsers: List<User> ->
    println(listOfUsers)
}

输出:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

现场演示(请注意,在演示中,所有输出都会同时出现,但这是演示站点的限制 - 这些行的出现时间与您在本地运行代码时所期望的时间一致)

这避免了原始问题中讨论的 ( abc_name_updated, )。def_name_updated但是,仍然有一个中间发射,123_name_updated因为234_name123_name_updated首先发射,并且它立即发送组合版本,因为它们是每个流中的最新版本。

但是,可以通过消除发射来避免这种情况(在我的机器上,小到 1 毫秒的超时有效,但为了保守起见,我做了 20 毫秒):

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.debounce(timeoutMillis = 20).collect {
        println(it)
    }
}

它可以为您提供所需的确切输出:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

现场演示

于 2020-03-24T10:34:34.357 回答
1

不幸的是,对于 kotlin Flow 的当前状态而言,这并非微不足道,似乎缺少重要的运算符。但请注意,您不是在寻找 rxJavas toList()。如果您尝试在 rxjava 中执行此操作,则toList必须concatMap等到所有 observabes 完成。这不是你想要的。

不幸的是,我认为没有办法绕过自定义函数。

它必须汇总observeUserForId您将传递给它的所有 id 返回的所有结果。它也不是一个简单的窗口函数,因为实际上可以想象一个observeUserForId已经返回两次而另一个调用仍然没有完成。因此,检查您是否已经拥有与将 id 传递给聚合函数相同数量的用户是不够的,您还必须按用户 id 进行分组。

今天晚些时候我会尝试添加代码。

编辑:这里承诺的是我的解决方案,我冒昧地稍微增加了要求。因此,每次所有 userId 都具有值并且基础用户更改时,流程都会发出。我认为这更有可能是您想要的,因为用户可能不会同步更改属性。

不过,如果这不是您想要的,请发表评论。

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}

inline fun <reified K, V> buildMap(keys: Set<K>, crossinline valueFunc: (K) -> Flow<V>): Flow<Map<K, V>> = flow {
    val keysSize = keys.size
    val valuesMap = HashMap<K, V>(keys.size)
    flowOf(*keys.toTypedArray())
            .flatMapMerge { key -> valueFunc(key).map {v -> Pair(key, v)} }
            .collect { (key, value) ->
                valuesMap[key] = value
                if (valuesMap.keys.size == keysSize) {
                    emit(valuesMap.toMap())
                }
            }
}

fun observeUsersForIds(): Flow<List<User>> {
    return observeBestUserIds().flatMapLatest { ids -> buildMap(ids.toSet(), ::observeUserForId as (String) -> Flow<User>) }
            .map { m -> m.values.toList() }
}


fun main() = runBlocking {
    observeUsersForIds()
        .collect { user ->
            println(user)
        }
}

这将返回

[User(name=def_name), User(name=abc_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

你可以在这里在线运行代码

于 2020-03-23T12:23:52.360 回答
0

您可以使用flatMapConcat

val users = observeBestUserIds()
        .flatMapConcat { ids ->
            flowOf(*ids.toTypedArray())
                .map { id ->
                    observeUserForId(id)
                }
        }
        .flattenConcat()
        .toList()

或者

    observeBestUserIds()
        .flatMapConcat { ids ->
            flowOf(*ids.toTypedArray())
                .map { id ->
                    observeUserForId(id)
                }
        }
        .flattenConcat()
        .collect { user ->

        }
于 2020-03-09T15:23:07.423 回答