有据可查的是,与空合并fs2.Stream
应该产生相同的fs2.Stream
. 这是Scaladocs的引文:
拥有的财产
merge(Stream.empty, s) == s
考虑以下完整的 Scala 程序fs2.Stream
:
发射元素
import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref
import scala.concurrent.ExecutionContext
object TestFs2 extends App {
implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val program = Ref.of[IO, Int](0).map(ref => {
fs2.Stream.repeatEval(ref.get).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
})
program.flatMap(_.compile.drain).unsafeRunSync()
}
该程序打印以下内容:
Got value 0
Got value 1
Got value 2
...
它看起来不错。现在应用上面的引用Scaladoc
,我得出结论,替换
fs2.Stream.repeatEval(ref.get)
和
fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int])
行为应该是相同的。这是更新的程序:
发射元素并与空 fs2.Stream 合并
import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref
import scala.concurrent.ExecutionContext
object TestFs2 extends App {
implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val program = Ref.of[IO, Int](0).map(ref => {
fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int]).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
})
program.flatMap(_.compile.drain).unsafeRunSync()
}
程序输出为
Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...
问题:为什么与空fs2.Stream
更改合并程序的行为会导致复制原始元素fs2.Stream
?