我想在我的一些服务上使用线程安全计数器来计算并发工作。
比如说,有一个 http 服务可以为多个请求提供服务,并且有一个isWorking
稍后用于显示微调器的属性。
这是我对计数器的实现:
class CounterImpl @Inject constructor(
@IoDispatcher private val ioDispatcher: CoroutineDispatcher,
@MainDispatcher private val mainDispatcher: CoroutineDispatcher,
private val log: Log
) : Counter {
private val channel = Channel<Int>()
private val _isWorking = MutableLiveData(false)
override val isWorking = _isWorking.asFlow()
init {
MainScope().launch {
channel.consumeAsFlow().onEach {
log(FILE, "counter got a $it", LogType.Debug)
}.scan(0) { acc, value -> acc + value }.map { it > 0}
.collect {
withContext(mainDispatcher) {
_isWorking.value = it
}
}
}
}
override suspend fun invoke(increment: Boolean) {
log(FILE, "counter invoked with $increment", LogType.Debug)
channel.send(if (increment) 1 else -1)
}
}
所以问题是有时对通道的最后一次发送调用没有到达consumeAsFlow
代码的一部分。
这是发生的事情的示例日志:
[Debug] Counter: counter invoked with true
[Debug] Counter: counter invoked with false
[Debug] Counter: counter got a 1
在这里,invoke 被调用一次,true
并且有一行表示counter got a 1
对应于该真实(增量)调用。但也有一个调用,false
我希望有一个相应的counter got a 0
行。但是那个永远不会出现。
for c in channel
如果这就是你的想法,我也尝试过迭代频道。