我正在构建一个聊天应用程序,并且遇到以下情况:在我的数据库中,我有 3 个数据源,但其中两个依赖于第一个。
A -> 是联系人
B -> 是最后一条未读消息
C -> 是消息数
我需要先获取联系人,然后使用其 ID 获取另外两个。要求是持续关注联系信息中的任何数据更改,或未读消息或消息计数。我如何使用 RxJava 只更新必要的不阻塞 UI 来做到这一点?(有些人告诉我我可以为此使用 Flowable)。
到目前为止我已经尝试过:
fun queryAllChats(): Flowable<MutableList<Chat>> =
dao.queryContactsFlowable().flatMap { contacts ->
Flowable.fromIterable(contacts)
.flatMapSingle { contact ->
getLastUnreadMessage(contact.id)
.materialize()
.zipWith(
getUnreadCount(contact.id)
) { msg: Notification<Messages>, unreadCount: Int ->
Chat(contact, msg.value, unreadCount)
}
}.toList().toFlowable()
}.subscribeOn(schedulers.io).observeOn(schedulers.main)
在视图模型中
var test = LiveDataReactiveStreams.fromPublisher(queryAllChats())
但它似乎只更新一次,然后不再更新任何数据。