11

我正在尝试使用Kotlin Flows. 这是我现在正在尝试的

flowOf(
 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
   .catch { error -> Timber.e(error) },
 remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
).flattenConcat().collect {
  Timber.i("Response Received")
}

这里的问题collect是仅在getDataFromServer返回时调用。我的期望是我应该从缓存中获取第一个事件,然后在几毫秒后从服务器获取第二个事件。在这种情况下"Response Received",会打印两次,但会立即打印一次。

在此其他变体"Response Received"中,仅在getDataFromServer()返回后打印一次。

 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
  .catch { error -> Timber.e(error) }
  .flatMapConcat {
    remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
  }
  .collect {
    Timber.i("Response Received")
  }

我之前使用的是 RxJava Flowable.concat(),它运行良好。Kotlin Flows 中有什么东西可以模仿这种行为吗?

4

3 回答 3

3

最近,merge运算符被添加到 Kotlin 协程版本1.3.3中。这是合并的PR

使用合并运算符,您应该能够在结果到达时获得结果。

于 2020-02-13T10:12:16.747 回答
3

这里的问题collect是仅在getDataFromServer返回时调用。

您的设计的第一个问题是 Flow-returning 功能也是可暂停的。这是两层可悬浮性。函数应该在没有任何延迟的情况下返回流,并且流本身应该在它们进入时发出项目。如果你遵循这个指南,你的初始代码已经可以工作了。

按照你编写这些函数的方式,如果你这样写,它们仍然可以工作:

flow<String> {
    emitAll(getCached())
    emitAll(getFromServer())
}

此语句立即完成,返回一个冷流。当你调用collect它时,它首先调用getCached()并发出缓存值,然后调用getFromServer()并发出服务器响应。


上述解决方案仅在您使用缓存值后才启动服务器调用。如果您需要两个流同时处于活动状态,请使用flatMapMerge.

假设您解决了上述基本问题并使您的流量返回功能非暂停,您所需要的只是:

flowOf(getCached(), getFromServer()).flattenMerge()

如果由于某种原因您不能这样做,则必须emitAll在每个调用周围添加包装器:

flowOf(
    flow { emitAll(getCached()) }, 
    flow { emitAll(getFromServer()) }
).flattenMerge()
于 2020-02-13T12:43:19.393 回答
0

事实证明,如果flowOf(someOperation()) someOperation()需要完成下游才能开始处理。它就像Observable.just(someOperation())RxJava世界上。

在第二种情况下flatMapConcat实际上是一个transform运算符,因此它显然返回最终处理的输出。

concatFlow 世界中似乎缺乏类似原生的运算符。这就是我最终解决这个问题的方法

flow {
   remoteDataSource.getDataFromCache()
   .catch { error -> Timber.e(error) }
   .onCompletion {
       remoteDataSource.getDataFromServer()
            .collect {
                 emit(it)
            }
    }.collect { emit(it) }
}
于 2020-02-13T06:53:20.050 回答