1

我正在构建一个聊天应用程序,并且遇到以下情况:在我的数据库中,我有 3 个数据源,但其中两个依赖于第一个。

A -> 是联系人
B -> 是最后一条未读消息
C -> 是消息数

我需要先获取联系人,然后使用其 ID 获取另外两个。要求是持续关注联系信息中的任何数据更改,或未读消息或消息计数。我如何使用 RxJava 只更新必要的不阻塞 UI 来做到这一点?(有些人告诉我我可以为此使用 Flowable)。

到目前为止我已经尝试过:

fun queryAllChats(): Flowable<MutableList<Chat>> =
dao.queryContactsFlowable().flatMap { contacts ->
        Flowable.fromIterable(contacts)
            .flatMapSingle { contact ->
                getLastUnreadMessage(contact.id)
                    .materialize()
                    .zipWith(
                        getUnreadCount(contact.id)
                    ) { msg: Notification<Messages>, unreadCount: Int ->
                        Chat(contact, msg.value, unreadCount)
                    }
            }.toList().toFlowable()
    }.subscribeOn(schedulers.io).observeOn(schedulers.main)

在视图模型中

var test = LiveDataReactiveStreams.fromPublisher(queryAllChats())

但它似乎只更新一次,然后不再更新任何数据。

4

1 回答 1

0

您的代码中的问题是,您只能观察联系人列表的变化,并且每个联系人只能获得一次最后的消息和未读计数。

这应该有效:

  fun queryAllChats() = dao.queryContactsFlowable()
    .switchMap { contacts ->
      val chats = contacts.map { contact ->
        Flowable.combineLatest(
          getUnreadCount(contact.id), // Flowable
          getLastUnreadMessage(contact.id), // Flowable
          { unreadCount, unreadMessages ->
            Chat(contact, unreadMessages, unreadCount)
          })
      }
      return@switchMap Flowable.combineLatest(chats) {
        it.map { it as Chat }
      }
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
于 2021-09-06T15:04:04.443 回答