2

以下代码仅打印 10000 即仅最后一个元素

val channel = BroadcastChannel<Int>(Channel.CONFLATED)
val flowJob = channel.asFlow().buffer(Channel.UNLIMITED).onEach {
    println(it)
}.launchIn(GlobalScope)

for (i in 0..100) {
    channel.offer(i*i)
}
flowJob.join()

代码可以在操场上运行。

但是由于 Flow 在单独的调度线程中启动,并且值被发送到 Channel 并且由于 Flow 具有无限的缓冲区,所以它应该接收每个元素直到调用 onEach。但是为什么只有最后一个元素能够被接收?

这是预期的行为还是一些错误?如果它的预期行为有人会尝试仅将最新元素推送到流中,但所有具有特定缓冲区的流都可以接收该元素。

4

3 回答 3

1

实际上,这是关于缓冲的“合并”方式。对于缓冲流,您有几种方法,例如使用buffer()方法或collectLatest()conflate()。他们每个人都有自己的缓冲方式。所以conflate()方法的方法是,当流发出值时,它会尝试收集,但是当收集器太慢时,conflate()为了流而跳过中间值。即使每次它在单独的协程中发出,它也会这样做。所以在一个频道中,类似的事情基本上正在发生。

这是官方文档解释:

当流表示操作或操作状态更新的部分结果时,可能不需要处理每个值,而只需处理最近的值。在这种情况下,当收集器处理它们的速度太慢时,可以使用合并运算符跳过中间值。

看看这个链接。

解释是针对流程的,但您需要专注于您正在使用的功能。在这种情况下,渠道和流量的合并是相同的。

于 2022-03-01T21:44:51.767 回答
0

正如一些评论所述, usingChannel.CONFLATED将仅存储最后一个值,并且您提供给channel,即使您的流程有缓冲区。

join()将暂停直到Job未完成,在您的情况下无限期,这就是您需要超时的原因。

 val channel = Channel<Int>(Channel.RENDEZVOUS)
 val flowJob = channel.consumeAsFlow().onEach {
     println(it)
 }.launchIn(GlobalScope)

GlobalScope.launch{
    for (i in 0..100) {
        channel.send(i * i)
    }
    channel.close()
}
flowJob.join()

查看此解决方案(游乐场链接),Channel.RENDEZVOUS您的频道仅在其他元素已被使用时才会接受新元素。这就是为什么我们必须使用send而不是offersend暂停直到它可以发送元素,同时offer返回一个布尔值,指示发送是否成功。最后,我们必须到close通道,以免join()暂停直到永恒。

于 2020-10-15T07:56:08.693 回答
0

这里的问题是Channel.CONFLATED. 取自文档:

Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
so that the receiver always gets the most recently sent element.
Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received,
while previously sent elements **are lost**.
Sender to this channel never suspends and [offer] always returns `true`.

This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.

This implementation is fully lock-free.

所以这就是为什么你只得到最新的(最后一个)元素。我会UNLIMITED Channel改用:

val channel = Channel<Int>(Channel.UNLIMITED)
val flowJob = channel.consumeAsFlow().onEach {
    println(it)
}.launchIn(GlobalScope)

for (i in 0..100) {
    channel.offer(i*i)
}
flowJob.join()
于 2020-10-14T13:48:36.063 回答