1

我有一个到目前为止我无法解决的问题我是 RxKotlin 的新手,所以它可能很容易。请看一下代码:

    override fun infos(): Stream<Info> =
        client.infoAboutItem(identifier)
                .map {
                    val itemId = it.itemId ?: ""
                    val item = client.itemForId(itemId)
                    ClientInfo(client, it, source, item) as Info
                }
                .let { AccessStream(it) }

其中stream是我们自制的集合。Map 是一种允许您遍历该集合中的每个项目的方法。

这里的问题是

 client.itemForId(itemId)

是一个 http 调用,它返回一个不理想的 Single。

我想在 map 中创建一个异步调用,该调用将返回 Item 而不是 Single,然后将其传递给 ClientInfo。到目前为止,我尝试过的是在地图内使用订阅并使用 blockingGet() 方法,但这会阻塞主线程,即使我在不​​同的线程上观察和订阅

所以它涉及对集合中的每一件事进行异步调用。

感谢帮助

4

2 回答 2

1

您可以尝试返回Observable<Stream<Info>>,然后它看起来像:

   override fun infos(): Observable<Stream<Info>> = 
                Observable.from(client.infoAboutItem(identifier))
                        .flatMapSingle {
                            val itemId = it.itemId ?: ""
                            client.itemForId(itemId)
                        }
                        .map { 
                            ClientInfo(client, it, source, item) as Info
                         }
                        .toList()
                        .flatMap {
                            AccessStream(it)
                        }
于 2017-08-31T07:33:04.863 回答
0

您应该将这些昂贵的操作包装到 observable 中,并使用平面地图将这些数据压缩到 Client Info 中。

我写了一个小样本来展示它。

class SimpleTest {
  val testScheduler = TestScheduler()

  @Test
  fun test() {
    infos().observeOn(Schedulers.immediate())
        .subscribe { logger("Output", it.toString()) }

    testScheduler.advanceTimeBy(10, TimeUnit.MINUTES)
  }

  fun infos(): Single<List<ClientInfo>> {
    return Observable.from(infoAboutItem("some_identifier"))
        .doOnNext { logger("Next", it.toString()) }
        .flatMap { aboutItem ->
          Observable.fromCallable { itemForId(aboutItem.itemId) }
              .subscribeOn(testScheduler)
              .map { ClientInfo(aboutItem = aboutItem, item = it) }
        }
        .doOnNext { logger("Next", it.toString()) }
        .toList()
        .toSingle()
  }

  data class ClientInfo(
      val id: String = UUID.randomUUID().toString(),
      val aboutItem: AboutItem,
      val item: Item
  )

  data class AboutItem(val itemId: String = UUID.randomUUID().toString())
  data class Item(val id: String = UUID.randomUUID().toString())

  fun infoAboutItem(identifier: String): List<AboutItem> {
    return (1..10).map { AboutItem() }
  }

  fun itemForId(itemId: String): Item {
    val sleepTime = Random().nextInt(1000).toLong()
    Thread.sleep(sleepTime)
    return Item()
  }

  fun logger(tag: String, message: String): Unit {
    val formattedDate = Date(Schedulers.immediate().now()).format()
    System.out.println("$tag @ $formattedDate: $message")
  }

  fun Date.format(): String {
    return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this)
  }
}
于 2017-08-31T19:59:08.573 回答