4

今天一直被 Kotlin 流/通道的问题所困扰。本质上,我想获取从流中发出的值,并立即将它们发送到通道中。然后,我们通过公开的方法以流的形式订阅该频道。这里的用例是拥有一个始终在线的频道订阅和一个可以独立打开和关闭的流。

private val dataChannel = BroadcastChannel<Data>(1)

 suspend fun poll() {
    poller.start(POLLING_PERIOD_MILLISECONDS)
        .collect {
            dataChannel.send(it)
        }
 }

 suspend fun stopPoll() {
     poller.stop()
 }

 suspend fun subscribe(): Flow<Data> {
     return dataChannel.asFlow()
 }

我在这里的简单用例是一个返回 channelFlow 的轮询器。理想情况下,我可以在 collect 方法中向通道发射。这似乎不起作用。我的菜鸟协程的想法是,因为收集和发送正在暂停,所以排放在收集中被暂停,我们被卡住了。

是否有任何用于流或通道的内置函数可以处理此行为或以任何其他方式实现此行为?

4

1 回答 1

0

对于您的情况,您可以尝试使用热数据流SharedFlow而不是Channel

private val dataFlow = MutableSharedFlow<String>(extraBufferCapacity = 1)

 suspend fun poll() {
    poller.start(POLLING_PERIOD_MILLISECONDS)
        .collect {
            dataFlow.tryEmit(it)
        }
 }

 suspend fun stopPoll() {
     poller.stop()
 }

 fun subscribe(): Flow<Data> {
     return dataFlow
 }

tryEmit()- 尝试在不暂停的情况下向此共享流发出值,因此调用它不会暂停collect块。

于 2022-01-09T19:54:16.160 回答