我正在尝试使用 SignalRef 中断 fs2 流。我使用以下内容设置并运行流。流应该在包含时运行,并在包含switch
时false
中断switch
true
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
val program: Stream[IO, Unit] =
Stream
.repeatEval(IO{
println(java.time.LocalTime.now)
println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
})
.metered(1.second)
program
.interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
}
program.compile.drain.unsafeRunAsync(() => _)
然后我尝试用
switch.map(_.set(true).unsafeRunSync)
但是,流仍在继续。在标准输出中我看到
15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false
所以显然它没有开始切换到 true 吗?