1

我连接到每 250 毫秒发送一次数据的蓝牙设备

在我的视图模型中,我希望订阅所述数据,运行一些暂停代码(运行大约需要 1000 毫秒),然后显示结果。

以下是我正在尝试做的一个简单示例

存储库:

class Repo() : CoroutineScope {
    private val supervisor = SupervisorJob()
    override val coroutineContext: CoroutineContext = supervisor + Dispatchers.Default
    private val _dataFlow = MutableSharedFlow<Int>()
    private var dataJob: Job? = null
    val dataFlow: Flow<Int> = _dataFlow

    init {
         launch {
            var counter = 0
            while (true) {
                counter++
                Log.d("Repo", "emmitting $counter")
                _dataFlow.emit(counter)
                delay(250)
            }
        }
    }

}

视图模型

class VM(app:Application):AndroidViewModel(app) {
    private val _reading = MutableLiveData<String>()
    val latestReading :LiveData<String>() = _reading

    init {
        viewModelScope.launch(Dispatchers.Main) {
            repo.dataFlow
                .map {
                    validateData() //this is where some validation happens it is very fast
                }
                .flowOn(Dispatchers.Default)
                .forEach {
                    delay(1000) //this is to simulate the work that is done,
                }
                .flowOn(Dispatchers.IO)
                .map {
                   transformData() //this will transform the data to be human readable 
                }
                .flowOn(Dispatchers.Default)
                .collect {
                    _reading.postValue(it)
                }
        }

    }
}

如您所见,当数据到来时,首先我验证它以确保它没有损坏(在默认调度程序上)然后我对其执行一些操作(保存并运行一个需要时间在 IO 调度程序上的长算法)然后我改变它以便应用程序用户可以理解它(切换回默认调度程序)然后我将其发布到可变实时数据,因此如果有来自 ui 层的订阅者,他们可以看到当前数据(在主调度程序上)

我有两个问题

a) 如果validateData失败,我该如何取消当前发射并继续下一个发射?

b) 有没有办法让在 viewModel 上工作的 dataFlow 订阅者生成新线程,以便延迟部分可以并行运行?

现在的时间线看起来像第一部分,但我希望它像第二部分一样运行 执行时间表

有没有办法做到这一点?

我尝试使用 buffer(),正如文档所述,“通过指定容量的通道缓冲流量排放并在单独的协程中运行收集器”。但是当我将它设置为 BufferOverflow.SUSPEND 我得到第一部分的行为,当我将它设置为 BufferOverflow.DROP_OLDEST 或 BufferOverflow.DORP_LATEST 我失去了排放

我也尝试过像这样使用 .conflate() :

repo.dataFlow
    .conflate()
    .map { ....

即使排放一个接一个地开始,有延迟的部分仍然等待前一个完成,然后再开始下一个

当我对那部分使用 .flowOn(Dispatchers.Default) 时,我会释放排放,当我使用 .flowOn(Dispatchers.IO) 或类似的东西时,Executors.newFixedThreadPool(4).asCoroutineDispatcher()他们总是等待前一个完成后再开始新的

编辑2:

经过大约 3 小时的实验,这似乎有效

 viewModelScope.launch(Dispatchers.Default) {
        repo.dataFlow
            .map {
                validateData(it)
            }
            .flowOn(Dispatchers.Default)
            .map {
                async {
                    delay(1000)
                    it
                }
            }
            .flowOn(Dispatchers.IO) // NOTE (A)
            .map {
                val result = it.await()
                transformData(result)
            }
            .flowOn(Dispatchers.Default)
            .collect {
                _readings.postValue(it)
            }
    }

但是我仍然没有弄清楚如果验证数据失败如何取消发射

并且由于某种原因,它仅在我使用 Dispatchers.IO 、 Executors.newFixedThreadPool(20).asCoroutineDispatcher() 和 Dispatchers.Unconfined 时才有效,我在注释 (A) 的位置, Dispatchers.Main 似乎不起作用(我预期的)但是Dispatchers.Default 似乎也不起作用,我不知道为什么

4

1 回答 1

1

第一个问题:好吧,从继续收集流的意义上讲,您无法从异常中恢复,根据文档“当操作符内的发射器或代码抛出异常时,流收集可以完成异常。” 因此,一旦抛出异常,集合就完成了(例外地),但是您可以通过将集合包装在 try/catch 块中或使用 catch() 运算符来处理异常。

第二个问题:你不能,虽然生产者(发射端)可以通过使用 buffer() 操作符并发,但收集总是顺序的。

根据您的图表,您需要扇出(一个生产者,许多消费者),您无法通过流来实现。流是冷的,每次你从它们收集时,它们从一开始就开始散发。

可以使用通道来实现扇出,在通道中,您可以让一个协程产生值,而许多协程消耗这些值。

编辑:哦,你的意思是验证失败不是函数本身,在这种情况下你可以使用filter()操作符。

BroadcastChannel 和 ConflatedBroadcastChannel 正在被弃用。SharedFlow 无法在您的用例中为您提供帮助,因为它们以广播方式发出值,这意味着生产者会等到所有消费者消费完每个值之后再生产下一个值。那仍然是顺序的,您需要并行性。produce()您可以使用通道构建器来实现它。

一个简单的例子:

val scope = CoroutineScope(Job() + Dispatchers.IO)

val producer: ReceiveChannel<Int> = scope.produce {
    var counter = 0
    val startTime = System.currentTimeMillis()
    while (isActive) {
        counter++
        send(counter)
        println("producer produced $counter at ${System.currentTimeMillis() - startTime} ms from the beginning")
        delay(250)
    }
}

val consumerOne = scope.launch {
    val startTime = System.currentTimeMillis()
    for (x in producer) {
        println("consumerOne consumd $x at ${System.currentTimeMillis() - startTime}ms from the beginning.")
        delay(1000)
    }
}

val consumerTwo = scope.launch {
    val startTime = System.currentTimeMillis()
    for (x in producer) {
        println("consumerTwo consumd $x at ${System.currentTimeMillis() - startTime}ms from the beginning.")
        delay(1000)
    }
}

val consumerThree = scope.launch {
    val startTime = System.currentTimeMillis()
    for (x in producer) {
        println("consumerThree consumd $x at ${System.currentTimeMillis() - startTime}ms from the beginning.")
        delay(1000)
    }
}

观察生产和消费时间。

于 2021-08-10T15:45:15.100 回答