4

Monix 使用 Ack 同步发出的消息,但如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随source.

请参阅此测试代码:

import java.util.concurrent.TimeUnit

import monix.execution.Scheduler.Implicits.global
import monix.execution.Ack.Continue
import monix.reactive.{Observable, OverflowStrategy}
import org.junit.Test


class MonixBackpressureWithGroupByTest2 {
  @Test
  def test(): Unit = {
    val source = Observable.range(0,130)

    val backPressuredStream = source.map(x => {
        println("simple log first  map - " + x)
        x
      })
      .asyncBoundary(OverflowStrategy.BackPressure(5))
      .map { i =>

        println("after backpressure map, and Rim 3 operation of source - " + ((i % 3) toString) -> i)
        ((i % 3) toString) -> i
      }
      .groupBy{case (k, v) => k}
      .flatMap(x => {
        val mapWithSleep = x.map{case groupedMsg@(key, value) =>
          Thread.sleep(2000)
          println("inner Observable after group by rim 3. sleep 2 second for every message - " + groupedMsg)
          groupedMsg
        }

        mapWithSleep

      })

    backPressuredStream.share.subscribe(
      (keyAndValue: (String, Long)) => Continue
    )

    global.scheduleWithFixedDelay(0L, 1000L, TimeUnit.MILLISECONDS, () => {
      println("========sleep 1 second ============")
    })

    Thread.currentThread().join()

  }

}

输出:

...

========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,72)
(after backpressure map, and Rim 3 operation of source - 1,73)
(after backpressure map, and Rim 3 operation of source - 2,74)
(after backpressure map, and Rim 3 operation of source - 0,75)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,75)
(after backpressure map, and Rim 3 operation of source - 1,76)
(after backpressure map, and Rim 3 operation of source - 2,77)
(after backpressure map, and Rim 3 operation of source - 0,78)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,78)
(after backpressure map, and Rim 3 operation of source - 1,79)
...

其中出现一些背压不匹配:
after: sleep 2 second for every message ...backpressure give 3 of itemafter backpressure map - ...

就背压而言, 怎么可能sleep 2 second for every message ...有一对一的关系?after backpressure map - ...

还有一个疑惑:为什么要记录sleep 2 second for every message 输出(0, 72), (0, 75), (0,78)但这样的事情(0, 72), (1, 73), (2,74)

谢谢。

莫尼克斯版本: "io.monix" %% "monix" % "3.0.0-RC1"

4

1 回答 1

2

您看到的行为正是您所期望的。

为了快速总结你的应用程序做了什么,让我用我的话来解释一下:


你有一个Observable生成数字,并为每个元素做一些副作用。

接下来,按 对元素进行分组_ % 3

接下来,您在每个组的Observable.

然后,你flatMap每组的Observable,产生一个单一的,平坦的Observable


那么为什么你一开始只看到第一组(哪里_ % 3 == 0)打印到控制台?***

答案在于flatMap:查看文档Observable,您会发现以下描述flatMap

final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]

Alias for concatMap.

[...]

Observables 就像你想 s 一样List:当你连接Lists 时,你最终会得到一个List包含 first 的元素List,然后是 second 的元素的单个List,依此类推。

在 Monix 中,Observable通过等待(read: ) 操作中的第一个Observable生成来发送“已完成”- 信号来实现相同的行为。只有这样,第二个才会被消耗,以此类推。flatMapconcatMapObservable

或者,简单地说,flatMap关心产生Observable的 s 的顺序。

但是你的Observable操作中的 s什么时候“完成”?为此,我们必须了解其工作原理——因为这就是它们的来源。 flatMapgroupBy

为了groupBy工作,尽管Observables 被懒惰地评估,它必须将传入的元素存储在缓冲区中。我对此不是 100% 确定的,但是如果groupBy像我认为的那样工作,对于任何Observable拉下一个元素的分组,它将无限期地遍历原始元素,Observable直到找到属于该组的元素,保存所有先前的 (但还不是必需的)属于该缓冲区中其他组的元素以供以后使用。

所有这一切意味着groupBy在源发出完成信号之前无法知道是否已找到组的所有元素Observable,然后它将使用所有剩余的缓冲元素,然后向分组Observable的 s 发出完成信号。

简而言之:Observable直到groupBy源完成后才Observable完成。

将所有这些信息汇总在一起时,您会明白只有当源 Observable (your Observable.range(0, 130)) 完成时,第一个 groupedObservable也将完成,因为flatMap只有这样,所有其他 grouped Observables 才会被使用。

因为我从您的最后一个问题中知道您正在尝试构建一个 Web 套接字,所以使用flatMap是一个坏主意 - 您Observable的传入请求源永远不会完成,实际上只为您遇到的第一个 IP 地址提供服务。

你必须做的是使用mergeMap. 当比较时concatMap mergeMap不关心元素的顺序,而是“先到先得” - 规则适用。


*** :当您完成我的解释并希望了解其原理groupByflatMap工作原理时,您就会明白我为什么要写“开头”!

于 2019-04-26T12:35:51.967 回答