5

有据可查的是,与空合并fs2.Stream应该产生相同的fs2.Stream. 这是Scaladocs的引文:

拥有的财产merge(Stream.empty, s) == s

考虑以下完整的 Scala 程序fs2.Stream

发射元素

import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
  implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
  implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  val program = Ref.of[IO, Int](0).map(ref => {
    fs2.Stream.repeatEval(ref.get).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  })

  program.flatMap(_.compile.drain).unsafeRunSync()
}

该程序打印以下内容:

Got value 0
Got value 1
Got value 2
...

它看起来不错。现在应用上面的引用Scaladoc,我得出结论,替换

fs2.Stream.repeatEval(ref.get)

fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int])

行为应该是相同的。这是更新的程序:

发射元素并与空 fs2.Stream 合并

import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
  implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
  implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  val program = Ref.of[IO, Int](0).map(ref => {
    fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int]).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  })

  program.flatMap(_.compile.drain).unsafeRunSync()
}

程序输出为

Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...

问题:为什么与空fs2.Stream更改合并程序的行为会导致复制原始元素fs2.Stream

4

2 回答 2

4

的文档merge还说:

在等待结果流消耗它之前,实现总是尝试从每一侧提取一个块。因此,可能有多达两个块(每个流一个)等待处理,而结果流是处理元素。

如果我理解正确,这意味着当结果流忙于处理 value0时,在更新之前已经从源中提取了一个新值ref

严格来说,我不认为这种行为违反了任何不变量。但对你来说,这很重要,因为

  • 您的流改变了它从中提取的源
  • 您的源流始终准备好发出元素

要解决第二点,您可以使用 1 元素队列而不是Ref.

AFAICT 不使用merge. 在处理它们之前,流可以自由地从源中提取尽可能多的元素,只要源可以发出它们。您的第一段代码基本上很幸运,因为您有一个非常简单的流,其中包含 1 个元素块。

于 2020-08-14T09:13:06.890 回答
1

原来是一个错误

mpilquist评论中将行为背后的原因描述为

它从源流中提取下一个块,然后获取信号量许可,该信号量许可被阻塞,直到从队列中处理前一个块。因此,它总是提前读取 1 个块。

按照mpilquist的建议,我创建了一个拉取请求来修复刚刚合并的问题。

于 2020-08-15T20:59:08.073 回答