2

我需要创建一个 API,它应该是流,它收集事件。问题是这些事件可能来自一个通道(我需要一个用于 PublishSubject 的模拟)和一个流(它执行网络请求)。

我也不确定这是否是最好的解决方案,所以让我知道我是否可以做得更好。

我在做什么:

我的 API:

override val statusFlow = trackStatus()

private fun trackStatus(): Flow<State> = flow { ... }

private val deviceChannel = Channel<State>(CONFLATED)

所以 statusFlow 应该返回一个流,我可以从中接收来自流和通道的数据。

我试图通过consumeAsflow将通道转换为流,但它不起作用。

我看到一个解决方案

private fun trackStatus(): Flow<State> = flowOf(channel.toFlow(), flow).flattenMerge()

正确的方法是什么?

4

2 回答 2

1
private fun trackStatus() = merge(deviceChannel.recieveAsFlow(), trackStatus)

merge()协程库中的定义是

/**
 * Merges the given flows into a single flow without preserving an order of elements.
 * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
 *
 * ### Operator fusion
 *
 * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
 */
@ExperimentalCoroutinesApi
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
于 2020-06-01T22:45:09.020 回答
0

这种情况的解决方案是 merge(),正如省答案中所指出的那样,但它不会那样工作。您应该使用 BroadcastChannel 而不是 Channel 因为最后一个在生命周期内只能提供一个订阅。此外,您应该使用 asFlow() 将此类通道转换为流。

于 2020-06-02T09:35:23.477 回答