1

我正在尝试producer为听众创作。我的代码看起来像这样

suspend fun foo() = produce{
    someEvent.addListener {
        this.send(it)
    }
}

但是我遇到了错误Suspension functions can be called only within coroutine,这是有道理的。我的问题是。有没有办法使用协程来实现这种模式?

4

1 回答 1

3

有几种方法可以实现它,具体取决于您要实现的目标:

如果您只想接收最近的事件,那么您应该使用一个组合的通道和offer方法,它会成功:

fun foo() = produce<T>(capacity = Channel.CONFLATED) {
    someEvent.addListener {
        offer(it)
    }
}

如果接收所有事件至关重要,那么您的选择取决于事件生产者的行为。这里要思考的关键问题是,如果您的事件生产者开始“不间断”地产生大量事件,会发生什么。根据经验,大多数“同步”事件生产者不支持显式背压信号,但它们仍然支持隐式背压信号——如果它们的侦听器速度较慢或阻塞线程,它们会减慢速度。因此,通常,以下解决方案非常适合同步事件生产者:

fun foo() = produce<T>() {
    someEvent.addListener {
        runBlocking { send(it) }
    }
}

如果您有一次生成一批事件并且您不想阻止生产者但让消费者按照自己的节奏处理它们的情况,您还可以指定一些积极capacity = xxx的参数作为构建器的性能优化。produce

在极少数情况下,当您的生产者不理解隐式阻塞背压信号时(当它作为某种多线程装置在没有内部同步的情况下猛烈地产生事件时),那么您可以使用具有无限容量的通道offer,但请注意您如果生产者超过消费者,则可能会耗尽内存:

fun foo() = produce<T>(capacity = Channel.UNLIMITED) {
    someEvent.addListener {
        offer(it)
    }
}

如果您的生产者支持明确的背压信号(如功能性反应流),那么您应该使用特殊的适配器将其背压信号正确传输到协程/从协程传输。该kotlinx.coroutines库有许多开箱即用的集成模块以及用于此目的的各种反应库。见这里

注意:不应该用修饰符标记你的foo函数。suspend无论如何调用foo都不会挂起调用者。它只是立即(同步)启动生产者协程。

要了解有关协程和不同类型通道的更多信息,我强烈建议您阅读 kotlinx.coroutines 上的指南

于 2017-10-12T09:56:28.450 回答