2

我正在尝试使用 SignalRef 中断 fs2 流。我使用以下内容设置并运行流。流应该在包含时运行,并在包含switchfalse中断switchtrue

  import cats.effect.IO
  import fs2.Stream
  import fs2.concurrent.SignallingRef

  import scala.concurrent.ExecutionContext
  import scala.concurrent.duration.DurationInt

  implicit val contextShift = IO.contextShift(ExecutionContext.global)
  implicit val timer = IO.timer(ExecutionContext.global)

  val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

  val program: Stream[IO, Unit] = {

      val program: Stream[IO, Unit] =
        Stream
          .repeatEval(IO{
            println(java.time.LocalTime.now)
            println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
          })
          .metered(1.second)

      program
        .interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
    }
  
  program.compile.drain.unsafeRunAsync(() => _)

然后我尝试用

switch.map(_.set(true).unsafeRunSync)

但是,流仍在继续。在标准输出中我看到

15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false

所以显然它没有开始切换到 true 吗?

4

2 回答 2

4

您的代码有几个问题。

首先,请检查以下人员的签名switch

val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

的类型SignallingRef被包装到IO. 这意味着 new 的创建SignallingRef被暂停,直到IOmonad 被评估(通过IO程序流隐式或通过调用显式unsafeRunXXX)。所以这个值的更合适的名称可能是createSwitch.

当您switch.map(_.get).unsafeRunSync实际上每次SignallingRef使用其默认值创建一个新实例时false,它永远不会被评估为真。

经验法则是,unsafeRunXXX在您完成 IO/Stream 程序的组装之前,您应该(几乎)永远不要调用方法,然后您应该运行这种方法一次。

正确的做法是创建switch一次以供理解,然后您可以将其program作为参数传递给它。

我对您的代码进行了一些重构,以完成我认为您打算做的事情,并添加了一些澄清注释。

import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)

//I changed name to createSwitch, which I think reflect reality more
val createSwitch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)

val program: Stream[IO, Unit] = {

  //I pass here switch as method's param
  def program(switch: SignallingRef[IO, Boolean]): Stream[IO, Unit] =
    Stream
      .repeatEval {
        for { //I used for-comprehension to split IO into 3 statements
          switchValue <- switch.get //here I get value of switch
          _ <- IO(println(java.time.LocalTime.now)) //I split println into 2 separate statements
          _ <- IO(println(switchValue)) //because it's not a good practive to run 2 effect in single IO
        } yield ()
      }
      .metered(1.second)

  for {
    switch <- Stream.eval(createSwitch)
    //here I create effect to set switch to true after 10 seconds and then use start to run it
    //separate fiber in background. If I didn't do that it would just wait 10 sec and only then run program
    _ <- Stream.eval(switch.set(true).delayBy(10.seconds).start)
    _ <- program(switch).interruptWhen(switch)
  } yield ()

}

program.compile.drain.unsafeRunSync()
于 2020-11-29T22:43:06.100 回答
3

就个人而言,我使用自己的终止开关来处理类似的事情:

final case class KillSwitch[F[_]](stream: Stream[F, Unit], switch: F[Unit])
object KillSwitch {

  def apply[F[_]: Sync]: F[KillSwitch[F]] =
    Ref.of(true).map(switch => KillSwitch(Stream.repeatEval(switch.get).takeWhile(identity).void, switch.set(false)))
}

我或多或少地使用它:

for {
  KillSwitch(kfStream, switch) <- KillSwitch[F]
  streamFiber <- yourStream.zip(kfStream).map(_._1).compile.drain.start
  // after a while
  _ <- switch // closes kfStream
  result <- streamFiber.join
} yield result

(假设这是一个伪代码来展示这个想法)。

于 2020-11-29T03:03:38.220 回答