我连接到每 250 毫秒发送一次数据的蓝牙设备
在我的视图模型中,我希望订阅所述数据,运行一些暂停代码(运行大约需要 1000 毫秒),然后显示结果。
以下是我正在尝试做的一个简单示例
存储库:
class Repo() : CoroutineScope {
private val supervisor = SupervisorJob()
override val coroutineContext: CoroutineContext = supervisor + Dispatchers.Default
private val _dataFlow = MutableSharedFlow<Int>()
private var dataJob: Job? = null
val dataFlow: Flow<Int> = _dataFlow
init {
launch {
var counter = 0
while (true) {
counter++
Log.d("Repo", "emmitting $counter")
_dataFlow.emit(counter)
delay(250)
}
}
}
}
视图模型
class VM(app:Application):AndroidViewModel(app) {
private val _reading = MutableLiveData<String>()
val latestReading :LiveData<String>() = _reading
init {
viewModelScope.launch(Dispatchers.Main) {
repo.dataFlow
.map {
validateData() //this is where some validation happens it is very fast
}
.flowOn(Dispatchers.Default)
.forEach {
delay(1000) //this is to simulate the work that is done,
}
.flowOn(Dispatchers.IO)
.map {
transformData() //this will transform the data to be human readable
}
.flowOn(Dispatchers.Default)
.collect {
_reading.postValue(it)
}
}
}
}
如您所见,当数据到来时,首先我验证它以确保它没有损坏(在默认调度程序上)然后我对其执行一些操作(保存并运行一个需要时间在 IO 调度程序上的长算法)然后我改变它以便应用程序用户可以理解它(切换回默认调度程序)然后我将其发布到可变实时数据,因此如果有来自 ui 层的订阅者,他们可以看到当前数据(在主调度程序上)
我有两个问题
a) 如果validateData
失败,我该如何取消当前发射并继续下一个发射?
b) 有没有办法让在 viewModel 上工作的 dataFlow 订阅者生成新线程,以便延迟部分可以并行运行?
有没有办法做到这一点?
我尝试使用 buffer(),正如文档所述,“通过指定容量的通道缓冲流量排放并在单独的协程中运行收集器”。但是当我将它设置为 BufferOverflow.SUSPEND 我得到第一部分的行为,当我将它设置为 BufferOverflow.DROP_OLDEST 或 BufferOverflow.DORP_LATEST 我失去了排放
我也尝试过像这样使用 .conflate() :
repo.dataFlow
.conflate()
.map { ....
即使排放一个接一个地开始,有延迟的部分仍然等待前一个完成,然后再开始下一个
当我对那部分使用 .flowOn(Dispatchers.Default) 时,我会释放排放,当我使用 .flowOn(Dispatchers.IO) 或类似的东西时,Executors.newFixedThreadPool(4).asCoroutineDispatcher()
他们总是等待前一个完成后再开始新的
编辑2:
经过大约 3 小时的实验,这似乎有效
viewModelScope.launch(Dispatchers.Default) {
repo.dataFlow
.map {
validateData(it)
}
.flowOn(Dispatchers.Default)
.map {
async {
delay(1000)
it
}
}
.flowOn(Dispatchers.IO) // NOTE (A)
.map {
val result = it.await()
transformData(result)
}
.flowOn(Dispatchers.Default)
.collect {
_readings.postValue(it)
}
}
但是我仍然没有弄清楚如果验证数据失败如何取消发射
并且由于某种原因,它仅在我使用 Dispatchers.IO 、 Executors.newFixedThreadPool(20).asCoroutineDispatcher() 和 Dispatchers.Unconfined 时才有效,我在注释 (A) 的位置, Dispatchers.Main 似乎不起作用(我预期的)但是Dispatchers.Default 似乎也不起作用,我不知道为什么