1

我正在尝试使用 kotlin 协程创建一个轮询机制,sharedFlow并希望在没有订阅者时停止并在至少有一个订阅者时激活。我的问题是,sharedFlow在这种情况下是正确的选择还是应该使用channel. 我尝试使用channelFlow,但我不知道如何关闭cancel块体外部的通道(不是工作)。有人可以帮忙吗?这是片段。

 fun poll(id: String) = channelFlow {
            while (!isClosedForSend) {
                try {
                    send(repository.getDetails(id))
                    delay(MIN_REFRESH_TIME_MS)
                } catch (throwable: Throwable) {
                    Timber.e("error -> ${throwable.message}")
                }
                invokeOnClose { Timber.e("channel flow closed.") }
        }
    } 
4

2 回答 2

0

首先,调用channelFlow(block)时,不需要手动关闭通道。区块执行完成后,通道将自动关闭。

我认为“生产”协程构建器功能可能是您所需要的。但不幸的是,它仍然是一个实验性的 api。

fun poll(id: String) = someScope.produce {
    invokeOnClose { Timber.e("channel flow closed.") }

    while (true) {
        try {
            send(repository.getDetails(id))
//          delay(MIN_REFRESH_TIME_MS)   //no need
        } catch (throwable: Throwable) {
            Timber.e("error -> ${throwable.message}")
        }
    }
}

fun main() = runBlocking {
    val channel = poll("hello")

    channel.receive()

    channel.cancel()
}

当你不调用返回通道的 receive() 方法时,produce 函数会暂停,所以不需要延迟。

更新:用于broadcast跨多个 ReceiveChannel 共享值。

fun poll(id: String) = someScope.broadcast {
    invokeOnClose { Timber.e("channel flow closed.") }

    while (true) {
        try {
            send(repository.getDetails(id))
//          delay(MIN_REFRESH_TIME_MS)   //no need
        } catch (throwable: Throwable) {
            Timber.e("error -> ${throwable.message}")
        }
    }
}

fun main() = runBlocking {
    val broadcast = poll("hello")

    val channel1 = broadcast.openSubscription()
    val channel2 = broadcast.openSubscription()
    
    channel1.receive()
    channel2.receive()

    broadcast.cancel()
}
于 2021-07-16T13:54:39.707 回答
0

您可以使用以广播方式发出值的 SharedFlow(在前一个值被所有收集器消耗之前不会发出新值)。

val sharedFlow = MutableSharedFlow<String>()
val scope = CoroutineScope(Job() + Dispatchers.IO)
var producer: Job()

scope.launch {
    val producer = launch() {
            sharedFlow.emit(...)
    }

    sharedFlow.subscriptionCount
              .map {count -> count > 0}
              .distinctUntilChanged()
              .collect { isActive -> if (isActive) stopProducing() else startProducing()
}

fun CoroutineScope.startProducing() {
    producer = launch() {
        sharedFlow.emit(...)
    }
        
}

fun stopProducing() {
    producer.cancel()
}
于 2021-08-10T16:53:16.653 回答