1

我试图了解猫效应是如何Cancelable运作的。根据文档,我有以下最小应用程序

import java.util.concurrent.{Executors, ScheduledExecutorService}
import cats.effect._
import cats.implicits._
import scala.concurrent.duration._

object Main extends IOApp {
  def delayedTick(d: FiniteDuration)
                 (implicit sc: ScheduledExecutorService): IO[Unit] = {

    IO.cancelable { cb =>
      val r = new Runnable {
        def run() =
          cb(Right(()))
      }
      val f = sc.schedule(r, d.length, d.unit)

      // Returning the cancellation token needed to cancel
      // the scheduling and release resources early
      val mayInterruptIfRunning = false
      IO(f.cancel(mayInterruptIfRunning)).void
    }
  }

  override def run(args: List[String]): IO[ExitCode] = {
    val scheduledExecutorService =
      Executors.newSingleThreadScheduledExecutor()
    for {
      x <- delayedTick(1.second)(scheduledExecutorService)
      _ <- IO(println(s"$x"))
    } yield ExitCode.Success
  }
}

当我运行这个:

❯ sbt run
[info] Loading global plugins from /Users/ethan/.sbt/1.0/plugins
[info] Loading settings for project stackoverflow-build from plugins.sbt ...
[info] Loading project definition from /Users/ethan/IdeaProjects/stackoverflow/project
[info] Loading settings for project stackoverflow from build.sbt ...
[info] Set current project to cats-effect-tutorial (in build file:/Users/ethan/IdeaProjects/stackoverflow/)
[info] Compiling 1 Scala source to /Users/ethan/IdeaProjects/stackoverflow/target/scala-2.12/classes ...
[info] running (fork) Main
[info] ()

程序此时就挂起。我有很多问题:

  1. 为什么程序在 1 秒后挂起而不是终止?
  2. 我们为什么要设置mayInterruptIfRunning = false?取消的全部目的不是中断正在运行的任务吗?
  3. 这是定义的推荐方法ScheduledExecutorService吗?我没有在文档中看到示例。
  4. 该程序等待 1 秒,然后返回()(然后意外挂起)。如果我想退回其他东西怎么办?例如,假设我想返回一个字符串,这是一些长时间运行的计算的结果。我将如何从中提取该值IO.cancelable?似乎困难在于IO.cancelable返回取消操作,而不是要取消的进程的返回值。

请原谅这篇长文,但这是我的build.sbt

name := "cats-effect-tutorial"

version := "1.0"

fork := true

scalaVersion := "2.12.8"

libraryDependencies += "org.typelevel" %% "cats-effect" % "1.3.0" withSources() withJavadoc()

scalacOptions ++= Seq(
  "-feature",
  "-deprecation",
  "-unchecked",
  "-language:postfixOps",
  "-language:higherKinds",
  "-Ypartial-unification")
4

3 回答 3

1

你需要关闭ScheduledExecutorService,试试这个

Resource.make(IO(Executors.newSingleThreadScheduledExecutor))(se => IO(se.shutdown())).use {
      se =>
        for {
          x <- delayedTick(5.second)(se)
          _ <- IO(println(s"$x"))
        } yield ExitCode.Success
    }
于 2020-07-15T18:04:28.790 回答
0
  1. 最后你需要明确terminate的执行程序,因为它不是由 Scala 或 Cats 运行时管理的,它不会自行退出,这就是你的应用程序举起而不是立即退出的原因。

  2. mayInterruptIfRunning = false如果线程正在运行,则优雅地终止它。您可以将其设置为true强制杀死它,但不建议这样做。

  3. 你有很多方法来创建一个ScheduledExecutorService,这取决于需要。对于这种情况,没关系,但问题1。

  4. 您可以通过 call 从 Cancelable IO 返回任何内容cb(Right("put your stuff here")),唯一阻止您检索返回A的是您的取消工作时。如果你在它到达重点之前停止它,你将一无所获。尝试返回IO(f.cancel(mayInterruptIfRunning)).delayBy(FiniteDuration(2, TimeUnit.SECONDS)).void,你会得到你所期望的。因为2 seconds > 1 second,您的代码在被取消之前有足够的时间运行。

于 2020-09-05T03:48:59.797 回答
0

我能够找到这些问题的答案,尽管仍有一些我不明白的事情。

为什么程序在 1 秒后挂起而不是终止?

由于某种原因,Executors.newSingleThreadScheduledExecutor()导致事情挂起。为了解决这个问题,我不得不使用Executors.newSingleThreadScheduledExecutor(new Thread(_)). 似乎唯一的区别是第一个版本等同于Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),尽管文档中没有任何内容明确说明为什么会这样。

我们为什么要设置mayInterruptIfRunning = false?取消的全部目的不是中断正在运行的任务吗?

我不得不承认我并不完全理解这一点。同样,文档并没有特别澄清这一点。将标志切换为true似乎根本不会改变行为,至少在Ctrl-c中断的情况下是这样。

这是定义 ScheduledExecutorService 的推荐方法吗?我没有在文档中看到示例。

显然不是。我想出的方法是受到猫效果源代码中的这段代码的粗略启发。

该程序等待 1 秒,然后返回()(然后意外挂起)。如果我想退回其他东西怎么办?例如,假设我想返回一个字符串,这是一些长时间运行的计算的结果。我将如何从中提取该值IO.cancelable?似乎困难在于IO.cancelable返回取消操作,而不是要取消的进程的返回值。

IO.cancellable { ... }块返回并且IO[A]回调cb函数具有类型Either[Throwable, A] => Unit。从逻辑上讲,这表明输入cb函数的任何内容都是 IO.cancellable 表达式将返回的内容(包装在 中IO)。所以要返回字符串"hello"而不是(),我们重写delayedTick

  def delayedTick(d: FiniteDuration)
                 (implicit sc: ScheduledExecutorService): IO[String] = { // Note IO[String] instead of IO[Unit]

    implicit val processRunner: JVMProcessRunner[IO] = new JVMProcessRunner
    IO.cancelable[String] { cb => // Note IO.cancelable[String] instead of IO[Unit]
      val r = new Runnable {
        def run() =
          cb(Right("hello")) // Note "hello" instead of ()
      }
      val f: ScheduledFuture[_] = sc.schedule(r, d.length, d.unit)
      IO(f.cancel(true))
    }
  }
于 2020-05-02T18:45:42.803 回答