5

在 RxJava 2 Flowable 中有不同的背压策略,其中最有趣的是:

  • 最新的
  • 缓冲
  • 降低

在整个 Rx 链中都受到尊重。

在 Kotlin 中有 Flow,它声明它具有开箱即用的背压支持。通过使用以下方法,我能够使 Flow 具有 BUFFER 和 LATEST 策略:

对于缓冲区:

observeFlow()
    .buffer(10)
    .collect { ... }

最新:

observeFlow()
    .conflate()
    .collect { ... }

这只是同一个缓冲区运算符的快捷方式。

但是我找不到任何可以与 DROP 一样工作的东西。简而言之,当先前的值尚未处理时,DROP 将删除流中的任何值。有了 Flow,我什至不确定这是否可能。

考虑案例:

observeFlow()
    .backpressureDrop() // non-existent operator, just for illustrative purposes
    .map { ... }
    .flatMapMerge { ... }
    .collect { ... }

因此,backpressureDrop 应该尊重在流中在下面完成的任何工作,而该操作员对下面发生的事情一无所知(没有来自底部的显式回调——比如 RxJava 订阅者中的“请求”方法)。因此,这似乎不太可能。在收集上一个项目之前,该操作员不应通过任何事件。

是否有任何我想念的现成运算符,或者是否有一种直接的方法可以用现有的 API 实现类似的东西?

4

3 回答 3

3

有没有一种简单的方法来实现这样的事情

取决于你的直截了当。这是我将如何做到的。

背压转化为协程世界中的程序化暂停和恢复。对于onBackpressureDrop,下游必须指示它已准备好接受一项并暂停它,而上游不应等待下游准备好。

您必须以无限制的方式消耗上游并将项目和终端事件移交给下游等待这些信号。

package hu.akarnokd.kotlin.flow.impl

import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
 : AbstractFlow<T>() {
    @ExperimentalCoroutinesApi
    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        coroutineScope {
            val consumerReady = AtomicBoolean()
            val producerReady = Resumable()
            val value = AtomicReference<T>()
            val done = AtomicBoolean()
            val error = AtomicReference<Throwable>();

            launch {
                try {
                    source.collect {
                        if (consumerReady.get()) {
                            value.set(it);
                            consumerReady.set(false);
                            producerReady.resume();
                        }
                    }
                    done.set(true)
                } catch (ex: Throwable) {
                    error.set(ex)
                }
                producerReady.resume()
            }

            while (true) {
                consumerReady.set(true)
                producerReady.await()

                val d = done.get()
                val ex = error.get()
                val v = value.getAndSet(null)

                if (ex != null) {
                    throw ex;
                }
                if (d) {
                    break;
                }

                collector.emit(v)
            }
        }
    }
}

注意:可恢复的实施。

所以让我们来看看实现。

首先,需要 5 个变量在上游的收集器和为下游工作的收集器之间传递信息: -consumerReady表示下游已准备好下一个项目, -producerReady表示生产者已存储下一个项目(或终端信号)和下游可以恢复 -value上游项目准备消费 -done上游已结束 -error上游已失败

接下来,我们必须为上游启动收集器,因为收集正在暂停,并且在完成之前不会让下游消费者循环运行。在这个收集器中,我们检查下游消费者是否准备好(通过consumerReady),如果是,则存储当前项目,清除准备就绪标志并通过 指示其可用性producerReady。清除consumerReady将阻止后续上游项目被存储,直到下游本身指示新的准备就绪。

当上游结束或崩溃时,我们设置doneorerror变量并指示生产者已发言。

在这launch { }部分之后,我们现在将代表下游收集器继续使用共享变量。

每一轮的第一件事是表明我们已准备好接受下一个值,然后等待生产者方发出信号,它已将下一个事件放入共享变量中。

接下来,我们从这些变量中收集值。我们急于完成或抛出错误,并且仅作为最后的手段将上游项目重新发送到下游收集器。

于 2020-01-26T11:28:20.430 回答
3

我们可以使用由Rendezvous Channel支持的 Flow 来构建它。

当容量为 0 - 它创建 RendezvousChannel。该通道根本没有任何缓冲区。只有当发送和接收调用及时(会合)时,元素才会从发送方传输到接收方,因此发送挂起直到另一个协程调用接收,接收挂起直到另一个协程调用发送。

集合通道没有缓冲区。因此,该通道的消费者需要暂停并等待下一个元素,以便将元素发送到该通道。我们可以利用这种特性来丢弃在没有通道暂停的情况下无法接受的值Channel.offer,这是一个正常的非暂停函数。

Channel.offer

如果可以在不违反容量限制的情况下立即将元素添加到此队列中并返回 true。否则,如果通道 isClosedForSend 则立即返回 false 或抛出异常(有关详细信息,请参阅关闭)。

因为channelFlow是缓冲的,所以我们需要向Flow<T>.buffer下游申请 0。

/**
 * Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow
 * are offered to the underlying [channelFlow]. If the consumer is not currently suspended and 
 * waiting for the next element, the element is dropped. 
 * 
 * @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element
 */
fun <T> Flow<T>.drop(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(capacity = 0)

这是一个慢消费者如何使用它来删除元素的示例。

fun main() = runBlocking {
    flow {
        (0..100).forEach {
            emit(it)
            delay(100)
        }
    }.drop().collect {
        delay(1000)
        println(it)
    }
}

与相应的输出:

0
11
21
31
41
51
61
71
81
91
于 2020-02-08T17:42:01.510 回答
1

根据 Anton Spaans的评论,有一种方法可以通过使用 channelFlow 来模拟 drop。
但问题是默认情况下channelFlow构建器使用BUFFER策略并且不允许参数化容量。
有一种方法可以在 ChannelFlowBuilder 中参数化容量,但问题是 API 是内部的并且ChannelFlowBuilder是私有的。
但本质上,如果复制粘贴ChannelFlowBuilder实现并创建这样的类:

class BackPressureDropFlow<T>(private val source: Flow<T>) : AbstractFlow<T>() {

    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        ChannelFlowBuilder<T>({ source.collect { offer(it) } }, capacity = 0)
            .collect { collector.emit(it) }
    }
}

(或直接应用与变换类似的解决方案)。
然后它似乎工作。
这里的主要关键是使用capacity = 0,它表示下游将在收到的每个项目上暂停(因为没有缓冲容量)。

于 2020-02-08T08:24:04.517 回答