0

我想在我的一些服务上使用线程安全计数器来计算并发工作。

比如说,有一个 http 服务可以为多个请求提供服务,并且有一个isWorking稍后用于显示微调器的属性。

这是我对计数器的实现:

class CounterImpl @Inject constructor(
    @IoDispatcher private val ioDispatcher: CoroutineDispatcher,
    @MainDispatcher private val mainDispatcher: CoroutineDispatcher,
    private val log: Log
) : Counter {
    private val channel = Channel<Int>()
    private val _isWorking = MutableLiveData(false)
    override val isWorking = _isWorking.asFlow()

    init {
        MainScope().launch {
            channel.consumeAsFlow().onEach {
                log(FILE, "counter got a $it", LogType.Debug)
            }.scan(0) { acc, value -> acc + value }.map { it > 0}
                .collect {
                    withContext(mainDispatcher) {
                        _isWorking.value = it
                    }
                }
        }
    }

    override suspend fun invoke(increment: Boolean) {
        log(FILE, "counter invoked with $increment", LogType.Debug)
        channel.send(if (increment) 1 else -1)
    }
}

所以问题是有时对通道的最后一次发送调用没有到达consumeAsFlow代码的一部分。

这是发生的事情的示例日志:

[Debug] Counter: counter invoked with true
[Debug] Counter: counter invoked with false
[Debug] Counter: counter got a 1

在这里,invoke 被调用一次,true并且有一行表示counter got a 1对应于该真实(增量)调用。但也有一个调用,false我希望有一个相应的counter got a 0行。但是那个永远不会出现。

for c in channel如果这就是你的想法,我也尝试过迭代频道。

4

1 回答 1

1

您不需要 aChannel这样做,aMutableStateFlow就足够了,因为您正在执行的操作(增加/减少数字)没有副作用。

    val count = MutableStateFlow(0)

    fun update(increment: Boolean) {
        count.update { it + if (increment) 1 else -1 }
    }

注意:update {}lambda 中的函数可能会被执行多次,所以它不能有副作用。

于 2022-02-21T11:12:36.797 回答