95

为什么我要使用 Kotlin 的协程?

似乎 RxKotlin 库的用途更加广泛。相比之下,Kotlin 的协程看起来没有那么强大,使用起来也更麻烦。

我的观点基于Andrey Breslav (JetBrains) 的这个设计演讲

演讲中的幻灯片可在此处访问。


编辑(感谢@hotkey):

更好地了解协程的当前状态

4

5 回答 5

111

免责声明:这个答案的一部分是无关紧要的,因为协程现在有流 API,与 Rx 非常相似。如果您想要最新的答案,请跳到最后的编辑。

Rx 中有两个部分;Observable 模式,以及一组可靠的操作符来操作、转换和组合它们。Observable 模式本身并没有做很多事情。与协程相同;这只是处理异步的另一种范式。您可以比较回调、Observable 和协程解决给定问题的优缺点,但不能将范例与功能齐全的库进行比较。这就像将语言与框架进行比较。

Kotlin 协程如何优于 RxKotlin?还没有使用协程,但它看起来类似于 C# 中的 async/wait。您只需编写顺序代码,一切就像编写同步代码一样简单……除了它异步执行。它更容易掌握。

为什么我要使用 kotlin 协程?我会为自己回答。大多数时候我会坚持使用 Rx,因为我更喜欢事件驱动的架构。但是如果出现我正在编写顺序代码的情况,并且我需要在中间调用一个异步方法,我会很乐意利用协程来保持这种状态,并避免将所有内容都包装在 Observable 中。

编辑:现在我正在使用协程,是时候更新了。

RxKotlin 只是在 Kotlin 中使用 RxJava 的语法糖,所以我将在下面讨论 RxJava 而不是 RxKotlin。协程是比 RxJava 更低级和更通用的概念,它们服务于其他用例。也就是说,有一个用例可以比较 RxJava 和协程(channel),它异步传递数据。协程在这方面比 RxJava 有明显的优势:

协程更好地处理资源

  • 在 RxJava 中,您可以将计算分配给调度程序,但subscribeOn()ObserveOn()令人困惑。每个协程都被赋予一个线程上下文并返回父上下文。对于一个通道,双方(生产者、消费者)都在自己的上下文中执行。协程在线程或线程池做作上更直观。
  • 协程可以更好地控制这些计算何时发生。例如,对于给定的计算,您可以传递手 ( yield)、优先级 ( select)、并行化 (multiple producer/ actoron channel) 或锁定资源 ( )。Mutex在服务器上(RxJava 首先出现)可能无关紧要,但在资源有限的环境中,可能需要这种级别的控制。
  • 由于它的反应性质,背压在 RxJava 中不太适合。在通道的另一端send()是一个挂起函数,当达到通道容量时会挂起。这是大自然赋予的开箱即用的背压。您也可以offer()使用通道,在这种情况下,调用永远不会暂停,而是false在通道已满时返回,从而有效地onBackpressureDrop()从 RxJava 复制。或者您可以编写自己的自定义背压逻辑,这对于协程来说并不困难,尤其是与使用 RxJava 做同样的事情相比。

还有另一个用例,协程大放异彩,这将回答您的第二个问题“我为什么要使用 Kotlin 协程?”。AsyncTask协程是后台线程或(Android)的完美替代品。这很容易launch { someBlockingFunction() }。当然,你也可以使用 RxJava 来实现这一点,Schedulers也许可以使用Completable。您不会(或很少)使用观察者模式和作为 RxJava 签名的运算符,这暗示这项工作超出了 RxJava 的范围。RxJava 的复杂性(这里是无用的税)会使你的代码比 Coroutine 的版本更冗长、更不干净。

可读性很重要。在这方面,RxJava 和协程方法有很大不同。协程比 RxJava 更简单。如果您对map(),flatmap()和一般的函数式反应式编程不满意,则协程操作更容易,涉及基本指令:for, if, try/catch... 但我个人发现协程的代码对于非平凡的任务更难理解。尤其是它涉及更多的嵌套和缩进,而 RxJava 中的运算符链使所有内容保持一致。函数式编程使处理更加明确。最重要的是,RxJava 可以使用来自其丰富(好吧,太丰富)运算符集的一些标准运算符来解决复杂的转换。当您拥有需要大量组合和转换的复杂数据流时,RxJava 会大放异彩。

我希望这些考虑将帮助您根据需要选择正确的工具。

编辑:协程现在有流程,一个非常非常类似于 Rx 的 API。可以比较每种的优缺点,但事实是差异很小。

协程的核心是一种并发设计模式,带有附加库,其中一个是类似于 Rx 的流 API。显然,Coroutines 的范围比 Rx 要广泛得多,Coroutines 能做的事情有很多 Rx 做不到的,我不能一一列举。但通常如果我在我的一个项目中使用协程,归结为一个原因:

协程更擅长从代码中删除回调

我避免使用过多损害可读性的回调。协程使异步代码简单易写。通过利用 suspend 关键字,您的代码看起来像同步代码。

我已经看到在项目中使用 Rx 主要用于替换回调的相同目的,但是如果您不打算修改架构以提交响应式模式,那么 Rx 将是一个负担。考虑这个接口:

interface Foo {
   fun bar(callback: Callback)
}

Coroutine 等价物更加明确,返回类型和关键字 suspend 表明它是一个异步操作。

interface Foo {
   suspend fun bar: Result
}

但是 Rx 等效项有一个问题:

interface Foo {
   fun bar: Single<Result>
}

当您在回调或协程版本中调用 bar() 时,您会触发计算;使用 Rx 版本,您可以获得可以随意触发的计算表示。您需要调用 bar() 然后订阅 Single。通常没什么大不了的,但对于初学者来说有点混乱,并且可能导致微妙的问题。

此类问题的一个示例,假设回调 bar 函数是这样实现的:

fun bar(callback: Callback) {
   setCallback(callback)
   refreshData()
}

如果您没有正确移植它,您将以一个只能触发一次的 Single 结束,因为 refreshData() 是在 bar() 函数中调用的,而不是在订阅时调用的。一个初学者的错误,理所当然,但问题是 Rx 不仅仅是一个回调替代品,许多开发人员都在努力掌握 Rx。

如果您的目标是将异步任务从回调转换为更好的范例,则协程是完美的选择,而 Rx 会增加一些复杂性。

于 2017-02-09T02:00:06.523 回答
87

Kotlin 协程与 Rx 不同。很难将它们进行比较,因为 Kotlin 协程是一种精简的语言功能(只有几个基本概念和一些操作它们的基本函数),而 Rx 是一个相当繁重的库,种类繁多即用型运算符。两者都旨在解决异步编程的问题,但是它们的解决方法非常不同:

  • Rx 带有一种特殊的函数式编程风格,几乎可以在任何编程语言中实现,而无需语言本身的支持。当手头的问题很容易分解为一系列标准运算符时,它运行良好,否则效果不佳。

  • Kotlin 协程提供了一种语言特性,可以让库编写者实现各种异步编程风格,包括但不限于函数式反应风格 (Rx)。使用 Kotlin 协程,您还可以以命令式风格、基于 promise/futures 的风格、actor 风格等编写异步代码。

将 Rx 与一些基于 Kotlin 协程实现的特定库进行比较更合适。

kotlinx.coroutines 库为例。这个库提供了一组原语async/await和通道,这些原语通常被嵌入到其他编程语言中。它还支持轻量级的无未来演员。您可以通过示例阅读 kotlinx.coroutines 指南中的更多内容。

在某些用例中,由 提供的通道kotlinx.coroutines可以替换或增加 Rx。有一个单独的协同程序反应流指南,它更深入地介绍了与 Rx 的异同。

于 2017-05-04T12:42:10.613 回答
75

我非常了解 RxJava,最近我切换到 Kotlin Coroutines 和 Flow。

RxKotlin 与 RxJava 基本相同,只是添加了一些语法糖以使其在 Kotlin 中编写 RxJava 代码更加舒适/惯用。

RxJava 和 Kotlin 协程之间的“公平”比较应该包括 Flow,我将在这里尝试解释原因。这会有点长,但我会尽量用例子来保持它尽可能简单。

使用 RxJava 你有不同的对象(从版本 2 开始):

// 0-n events without backpressure management
fun observeEventsA(): Observable<String>

// 0-n events with explicit backpressure management
fun observeEventsB(): Flowable<String>

// exactly 1 event
fun encrypt(original: String): Single<String>

// 0-1 events
fun cached(key: String): Maybe<MyData>

// just completes with no specific results
fun syncPending(): Completable

在 kotlin coroutines + flow 中,您不需要很多实体,因为如果您没有事件流,您可以只使用简单的协程(挂起函数):

// 0-n events, the backpressure is automatically taken care off
fun observeEvents(): Flow<String>

// exactly 1 event
suspend fun encrypt(original: String): String

// 0-1 events
suspend fun cached(key: String): MyData?

// just completes with no specific results
suspend fun syncPending()

奖励:Kotlin Flow / Coroutines 支持null值(RxJava 2 移除了支持)

暂停函数顾名思义:它们是可以暂停代码执行并在函数完成后继续执行的函数;这使您可以编写感觉更自然的代码。

运营商呢?

在 RxJava 中,你有很多运算符(map, filter, flatMap, switchMap, ...),其中大多数都有对应于每个实体类型的版本(Single.map(), Observable.map(), ...)。

Kotlin Coroutines + Flow不需要那么多操作符,让我们看看为什么用一些最常见的操作符的例子

地图()

RxJava:

fun getPerson(id: String): Single<Person>
fun observePersons(): Observable<Person>

fun getPersonName(id: String): Single<String> {
  return getPerson(id)
     .map { it.firstName }
}

fun observePersonsNames(): Observable<String> {
  return observePersons()
     .map { it.firstName }
}

Kotlin 协程 + 流

suspend fun getPerson(id: String): Person
fun observePersons(): Flow<Person>

suspend fun getPersonName(id: String): String? {
  return getPerson(id).firstName
}

fun observePersonsNames(): Flow<String> {
  return observePersons()
     .map { it.firstName }
}

对于“单一”案例,您不需要运算符,并且该Flow案例非常相似。

平面图()

flatMap操作符和他的兄弟姐妹,switchMap存在contactMap允许你组合不同的 RxJava 对象,从而在映射你的事件时执行潜在的异步代码。

假设您需要,对于每个人,从数据库(或远程服务)中获取它的保险

RxJava

fun fetchInsurance(insuranceId: String): Single<Insurance>

fun getPersonInsurance(id: String): Single<Insurance> {
  return getPerson(id)
    .flatMap { person ->
      fetchInsurance(person.insuranceId)
    }
}

fun observePersonsInsurances(): Observable<Insurance> {
  return observePersons()
    .flatMap { person ->
      fetchInsurance(person.insuranceId) // this is a Single
          .toObservable() // flatMap expect an Observable
    }
}

让我们看看 Kotlin Coroutiens + Flow

suspend fun fetchInsurance(insuranceId: String): Insurance

suspend fun getPersonInsurance(id: String): Insurance {
  val person = getPerson(id)
  return fetchInsurance(person.insuranceId)
}

fun observePersonsInsurances(): Flow<Insurance> {
  return observePersons()
    .map { person ->
      fetchInsurance(person.insuranceId)
    }
}

像以前一样,在简单的协程情况下,我们不需要运算符,我们只需像不异步的情况下那样编写代码,只需使用挂起函数即可。

Flow不是一个错字,不需要flatMap运算符,我们可以使用map. 原因是 map lambda 是一个挂起函数!我们可以在其中执行挂起代码!!!

为此,我们不需要其他操作员。

我在这里作弊了一点

Rx flatMapswitchMap并且concatMap行为略有不同。RxflatMap为每个事件生成一个新流,然后将它们合并在一起:您在输出中收到的新流事件的顺序未确定,它可能与输入中的顺序或事件不匹配

Rx concatMap“修复”了这个问题,并保证您将按照输入事件的相同顺序获取每个新流

RxswitchMap将在收到新事件时处理任何先前运行的流,只有收到的最后一个输入与此操作符有关

所以你看,这不是真的Flow.map,它实际上更类似于 Rx concatMap,这是你期望从地图操作员那里得到的更自然的行为。

但是确实你需要更少的操作符,在 map 里面你可以做任何你想要的异步操作并重现它的行为,flatMap因为它是一个可暂停的函数。RxJava 的实际等效运算符flatMapFlow.flatMapMergeoperator。

RxJava 的等价物switchMap可以在 Flow 中通过在conflate()操作符之前使用操作符来实现map

对于更复杂的东西,您可以使用 Flowtransform()运算符,它为每个事件发出您选择的 Flow。

每个 Flow 操作员都接受暂停功能!

在上一段中我告诉过你我作弊了。但是我所说的Flow 不需要那么多操作符的关键在于大多数操作符的回调都是挂起函数。

所以说你需要,filter()但你的过滤器需要执行网络调用来知道你是否应该保留这个值,使用 RxJava 你需要将多个运算符与不可读的代码结合起来,使用 Flow 你可以使用filter()

fun observePersonsWithValidInsurance(): Flow<Person> {
  return observerPersons()
    .filter { person ->
        val insurance = fetchInsurance(person.insuranceId) // suspending call
        insurance.isValid()
    }
}

延迟(),开始(),连接(),...

在 RxJava 中,您有许多运算符用于在前后应用延迟或添加项目:

  • 延迟()
  • 延迟订阅()
  • 开始(T)
  • 开始(可观察)
  • 连接(...)

使用 kotlin Flow,您可以简单地:

grabMyFlow()
  .onStart {
    // delay by 3 seconds before starting
    delay(3000L)
    // just emitting an item first
    emit("First item!")
    emit(cachedItem()) // call another suspending function and emit the result
  }
  .onEach { value ->
    // insert a delay of 1 second after a value only on some condition
    if (value.length() > 5) {
      delay(1000L)
    }
  }
  .onCompletion {
    val endingSequence: Flow<String> = grabEndingSequence()
    emitAll(endingSequence)
  }

错误处理

RxJava 有很多操作符来处理错误:

  • onErrorResumeWith()
  • onErrorReturn()
  • onErrorComplete()

使用 Flow,您只需要操作符catch()

  grabMyFlow()
    .catch { error ->
       // emit something from the flow
       emit("We got an error: $error.message")
       // then if we can recover from this error emit it
       if (error is RecoverableError) {
          // error.recover() here is supposed to return a Flow<> to recover
          emitAll(error.recover())
       } else {
          // re-throw the error if we can't recover (aka = don't catch it)
          throw error
       }
    }

并且具有暂停功能,您可以使用try {} catch() {}.

您可以使用单个catch运算符实现所有 RxJava 错误运算符,因为您获得了一个挂起功能。

易于编写流运算符

由于协同程序在引擎盖下为 Flow 提供动力,因此编写运算符更容易。如果你曾经检查过 RxJava 运算符,你会发现它有多难以及你需要学习多少东西。

编写 Kotlin Flow 运算符更容易,您只需在此处查看已经是 Flow 一部分的运算符的源代码即可了解。原因是协程使编写异步代码更容易,并且操作符使用起来更自然。

作为奖励,Flow 运算符都是 kotlin 扩展函数,这意味着您或库都可以轻松添加运算符,并且使用起来不会感到奇怪(在 RxJava 中observable.lift()observable.compose()需要组合自定义运算符)。

上游线程不向下游泄漏

这甚至意味着什么?

这解释了为什么在 RxJava 中你有subscribeOn()observeOn()而在 Flow 中你只有flowOn().

让我们以这个 RxJava 为例:

urlsToCall()
  .switchMap { url ->
    if (url.scheme == "local") {
       val data = grabFromMemory(url.path)
       Flowable.just(data)
    } else {
       performNetworkCall(url)
        .subscribeOn(Subscribers.io())
        .toObservable()
    }
  }
  .subscribe {
    // in which thread is this call executed?
  }

那么回调在哪里subscribe执行呢?

答案是:

要看...

如果它来自网络,则它位于 IO 线程中;如果它来自另一个未定义的分支,则取决于用于发送 url 的线程。

如果您考虑一下,您编写的任何代码:您不知道它将在哪个线程中执行:始终取决于调用者。这里的问题是线程不再依赖于调用者,它依赖于内部函数调用的作用。

假设你有这个简单的标准代码:

fun callUrl(url: Uri) {
  val callResult = if (url.scheme == "local") {
    grabFromMemory(url.path)
  } else {
    performNetworkCall(url)
  }
  return callResult
}

return callResult想象一下,如果不查看内部grabFromMemory(),就无法知道行在哪个线程中执行performNetworkCall()

想一想:根据您调用的函数以及它们在内部执行的操作来更改线程。

回调 API 经常发生这种情况:除非记录在案,否则您无法知道您提供的回调将在哪个线程中执行。

这就是“上游线程向下游泄漏”的概念。

对于 Flow 和 Coroutines,情况并非如此,除非您明确要求这种行为(使用Dispatchers.Unconfined)。

suspend fun myFunction() {
  // execute this coroutine body in the main thread
  withContext(Dispatchers.Main) {
    urlsToCall()
      .conflate() // to achieve the effect of switchMap
      .transform { url ->
        if (url.scheme == "local") {
           val data = grabFromMemory(url.path)
           emit(data)
        } else {
           withContext(Dispatchers.IO) {
             performNetworkCall(url)
           }
        }
      }
      .collect {
        // this will always execute in the main thread
        // because this is where we collect,
        // inside withContext(Dispatchers.Main)
      }
  }
}

协程代码将在它们被执行的上下文中运行。并且只有网络调用的部分会在 IO 线程上运行,而我们在这里看到的其他所有内容都会在主线程上运行。

好吧,实际上,我们不知道里面的代码grabFromMemory()会在哪里运行,但我们并不关心:我们知道它将在主线程中被调用,在那个挂起函数中我们可以使用另一个 Dispatcher,但我们知道什么时候它将返回结果,val data这将再次出现在主线程中。

这意味着,查看一段代码,更容易判断它将在哪个线程中运行,如果您看到显式 Dispatcher = 它就是那个调度程序,如果您没有看到它:在任何线程调度程序中,您正在查看的挂起调用正在被调用。

结构化并发

这不是 kotlin 发明的概念,但这是他们比我所知道的任何其他语言都更接受的东西。

如果我在这里解释的内容不足以让您阅读本文或观看此视频

那是什么?

使用 RxJava,你订阅了 observable,它们给了你一个Disposable对象。

当不再需要它时,您需要小心处理它。因此,您通常做的是保留对它的引用(或将其放在 a 中),以便以后不再需要时CompositeDisposable调用它。dispose()如果你不这样做,linter 会给你一个警告。

RxJava 比传统的线程要好一些。当您创建一个新线程并在其上执行某些操作时,这是“一劳永逸”,您甚至没有办法取消它:Thread.stop()已被弃用,有害,而且最近的实现实际上什么都不做。Thread.interrupt()使您的线程失败等。任何异常都会丢失..您得到了图片。

使用 kotlin 协程和流程,它们颠倒了“一次性”的概念。你不能创建一个没有CoroutineContext.

这个上下文定义了scope你的协程。在其中生成的每个子协程都将共享相同的范围。

如果您订阅流程,您必须在协程内或提供范围。

您仍然可以保留对您启动的协程 ( Job) 的引用并取消它们。这将自动取消该协程的每个子进程。

如果您是 Android 开发人员,他们会自动为您提供这些范围。示例:viewModelScope并且您可以在具有该范围的 viewModel 内启动协程,知道在清除 viewmodel 时它们将自动取消。

viewModelScope.launch {
  // my coroutine here
}

如果任何孩子失败,一些作用域将终止,另一些作用域将让每个孩子离开他自己的生命周期,而不会在一个失败时停止其他孩子(SupervisedJob)。

为什么这是一件好事?

让我试着像Roman Elizarov那样解释它。

一些旧的编程语言有这个概念goto,基本上可以让你随意从一行代码跳转到另一行代码。

非常强大,但如果被滥用,你最终可能会得到非常难以理解的代码,难以调试和推理。

因此,新的编程语言最终将其从语言中完全删除。

当你使用ifor whileorwhen时,更容易对代码进行推理:不管这些块内部发生了什么,你最终都会从它们中走出来,这是一个“上下文”,你没有奇怪的跳进跳出.

启动线程或订阅 RxJava observable 类似于 goto:您正在执行的代码将继续运行,直到“其他地方”停止。

使用协程,通过要求您提供上下文/范围,您知道当您的范围覆盖所有内容时,协程将在您的上下文完成时完成,无论您有单个协程还是 10000 个协程都没有关系。

您仍然可以通过使用“转到”协程GlobalScope,出于同样的原因,您不应该goto在提供它的语言中使用它。

冷与热 - ShareFlow 和 StateFlow

当我们使用响应式流时,我们总是有冷流和热流的概念。这些是 Rx 世界和 Kotlin Flows 的概念

冷流就像我们代码中的一个函数:它就在那里,在你调用它之前什么都不做。使用 Flow 意味着它定义了流的作用,但在您开始收集它之前它什么也不做。而且,就像一个函数,如果你收集(调用)它两次,流将运行两次。(例如,如果收集两次,执行 http 请求的冷流将执行两次请求)。

热流不是那样工作的。当您对它们进行多次对方付费呼叫时,它们都在引擎盖下共享相同的热流,这意味着您的热流运行一次,您可以拥有多个观察者。

您通常可以使用某些运算符将冷流转换为热流。

在 RxJava 上,您可以使用Connectable Observable/Flowable 的概念。

val coldObservable: Observable<Something> = buildColdObservable()

// create an hot observable from the cold one
val connectableObservable: ConnectableObservable<Something> = coldObservable.publish()

// you can subscribe multiple times to this connectable
val subADisposable: Disposable = connectableObservable.subscribe(subscriberA)
val subBDisposable: Disposable = connectableObservable.subscribe(subscriberB)

// but nothing will be emitted there until you call
val hotDisposable: Disposable = connectableObservable.connect()

// which actually run the cold observable and share the result on bot subscriberA and subscriberB

// while it's active another one can start listening to it
val subCDisposable: Disposable = connectableObservable.subscribe(subscriberC)

当第一个订阅者连接时,您不会有其他有用的操作员喜欢refCount()或自动autoConnect()将其转回Connectable标准流和引擎盖下。.connect()


buildColdObservable()
   .replay(1) // when a new subscriber is attached receive the last data instantly
   .autoConnect() // keep the cold observable alive while there's some subscriber

在 Flow上,您有shareIn()stateIn()运算符。您可以在此处查看 API 设计。当您“连接”时,它们在处理时不那么“手动”。

buildColdFlow()
  .shareIn(
    // you need to specify a scope for the cold flow subscription
    scope = myScope,
    // when to "connect"
    started = SharingStarted.WhileSubscribed(),
    // how many events already emitted should be sent to new subscribers
    replay = 1,
  )

范围

范围适用于结构化并发。在 RxJava 上,它connect()是实际订阅冷 observable 的操作,它为您提供了一个Disposable您必须.dispose()在某个地方调用的操作。如果您使用refCount()autoConnect()在第一个订阅者上调用它,并且refCount()永远不会释放 with,而autoConnect()当没有更多订阅者时释放 with。

使用 Flow,您需要提供一个专用的 Scope 来收集冷流,如果您取消该范围,冷流将停止发射并且不再可用。

开始

所以这个很简单

  • RxJava refCount()--> Flow SharingStarted.Lazily,开始收集第一个订阅者
  • RxJava autoConnect()-> Flow SharingStarted.WhileSubscribed(),开始收集第一个订阅者并在没有订阅者时取消它
  • connect()在任何订阅 -> Flow 之前手动调用 RxJava SharingStarted.Eagerly(),立即开始收集

WhileSubscribed()有有用的参数,检查出来

您还可以定义自己的逻辑,SharingStarted以便在从冷流收集时进行处理。

行为和背压

当你有一个 hot observable 时,你总是需要处理背压问题。1 个数据源被多种方式监听,一个监听器可能比其他监听器慢。

Flow.shareIn默认在一个专用的协程中收集冷流并缓冲发射。这意味着如果冷流发射得太快,它将使用缓冲区。您可以更改此行为。

SharedFlow如果需要,Kotlin还允许您直接访问重播缓冲区以检查先前的发射。

取消订阅者不会影响共享流。

使用flowOn()来更改Dispatcher订阅者不会对共享流产生影响(flowOn()如果您需要在某些特定调度程序中运行冷流,请在共享之前使用)

状态输入

Flow 有一个ShareFlow称为“特殊”版本的版本,StateFlow您可以使用stateIn()它从另一个流中创建一个。

AStateFlow总是有 1 个值,它不能为“空”,所以你需要在做的时候提供初始值stateIn()

AStateFlow永远不能抛出异常并且永远不能终止(这种方式类似于BehaviorRelayRxRelay 库中的方式)

AStateFlow只会在状态发生变化时发出(就像它有一个内置的distinctUntilChanged().

RxJava Subjects 与 Mutable*Flow RxJava 中的 ASubject是一个类,您可以使用它手动将数据推送到其上,同时仍将其用作流。

在 Flow 中你可以使用MutableSharedFloworMutableStateFlow来达到类似的效果。

您也可以使用 Kotlin 协程,Channels但它们被认为是较低级别的 API。

有什么缺点吗?

Flow 仍在开发中,RxJava 中可用的一些功能可能在 Kotlin Coroutines Flow 中被标记为实验性的,或者在这里和那里有一些不同。

某些利基运算符或运算符功能可能尚未实现,您可能必须自己实现它(至少它更容易)。

但除此之外,我所知道的没有任何缺点。

但是有一些差异需要注意,这可能会导致从 RxJava 切换时出现一些摩擦,并且需要您学习新事物。

结构化并发向前迈进了一步,但引入了您需要学习和习惯的新概念(范围、supervisorJob):取消的处理方式完全不同。

有一些问题需要注意。

问题:取消异常

如果您cancel()在协程中工作或throw CancellationException()异常传播到父协程,除非您使用了主管范围/作业。如果发生这种情况,父协程也会取消被取消的协程的兄弟协程。

但是如果你catch(e: Exception),即使使用runCatching {},你也必须记住重新抛出,CancellationException()否则你会得到意想不到的结果,因为协程已被取消,但你的代码仍在尝试执行,就像它不是一样。

问题:UncaughtExceptionHandler

如果您launch { ... }创建了一个新的协程并且该协程默认抛出,这将终止协程但不会使应用程序崩溃,您可能会完全错过一些问题。

此代码不会使您的应用崩溃。

launch {
  throw RuntimeException()
}

在某些情况下,它甚至可能不会在日志中打印任何内容。

如果是取消异常,它肯定不会在日志中打印任何内容。

于 2020-09-17T17:56:58.807 回答
6

您链接的谈话/文档没有谈论频道。通道填补了您当前对协程和事件驱动编程的理解之间的空白。

使用协程和通道,您可以进行事件驱动编程,就像您可能习惯使用 rx 一样,但是您可以使用看起来同步的代码来完成它,而无需使用尽可能多的“自定义”运算符。

如果您想更好地理解这一点,我建议您看看 kotlin 之外,这些概念更成熟和精致(不是实验性的)。core.async从 Clojure、Rich Hickey 视频、帖子和相关讨论中查看。

于 2017-11-11T07:07:56.760 回答
3

协程旨在提供一个轻量级的异步编程框架。在启动异步作业所需的资源方面是轻量级的。协程不强制使用外部 API,对用户(程序员)来说更自然。相比之下,RxJava + RxKotlin 有一个额外的数据处理包,它在 Kotlin 中并不真正需要,它在标准库中有一个非常丰富的 API,用于序列和集合处理。

如果您想了解更多关于 Android 上协程的实际使用,我可以推荐我的文章: https ://www.netguru.com/codestories/android-coroutines-%EF%B8%8Fin-2020

于 2019-11-20T09:10:57.567 回答