1

我正在寻找一种方式Task(即外部范围)可以使用或等效的方式执行“子任务” flatMap,并确保外部范围中的任何后续链接调用都使用原始调度程序。

使用的库和scala:

  • 斯卡拉 - 2.12.4
  • 莫尼克斯 -"io.monix" %% "monix" % "3.0.0-RC1"
  • 猫 -"org.typelevel" %% "cats-core" % "1.0.1"

示例代码:

import monix.eval.Task
import monix.execution.Scheduler
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import monix.execution.Scheduler.Implicits.global
import cats.implicits._

object Test extends App {
  val io1 = Scheduler.io("io1")
  val io2 = Scheduler.io("io2")

  def taskEval(name: String) = Task.eval(println(s"Running eval Task [$name] on thread [${Thread.currentThread().getName}]"))

  def subTask: Task[Unit] = {
    taskEval("subTaskScope").executeOn(io2)
  }

  def outerScope(sub: Task[Unit]): Task[Unit] = {
    taskEval("outerScopeBefore") *> sub *> taskEval("outerScopeAfter")
  }

  def outerScopeTryProtect(sub: Task[Unit]): Task[Unit] = {
    taskEval("outerScopeBefore") *> (sub <* Task.shift) *> taskEval("outerScopeAfter")
  }

  val program1 = taskEval("programBefore").executeOn(io1) *> outerScope(subTask) *> taskEval("programAfter")
  val program2 = taskEval("programBefore").executeOn(io1) *> outerScopeTryProtect(subTask) *> taskEval("programAfter")

  Await.result(program1.runAsync, Duration.Inf)
// Running eval Task [programBefore] on thread [io1-573]
// Running eval Task [outerScopeBefore] on thread [io1-573]
// Running eval Task [subTaskScope] on thread [io2-574]
// Running eval Task [outerScopeAfter] on thread [io2-574] // << we don't shift back so we are stuck with the scheduler that is forces by subTask
// Running eval Task [programAfter] on thread [io2-574]

  println("------")
// Running eval Task [programBefore] on thread [io1-573]
// Running eval Task [outerScopeBefore] on thread [io1-573]
// Running eval Task [subTaskScope] on thread [io2-574]
// Running eval Task [outerScopeAfter] on thread [scala-execution-context-global-575] // we shift the scheduler but this restores the default scheduler
// Running eval Task [programAfter] on thread [scala-execution-context-global-575]

  Await.result(program2.runAsync, Duration.Inf)
}

subTask方法希望在专用调度程序 ( io2) 上执行一些异步工作,因此它强制使用调度程序的异步边界executeOn

outerScope方法正在某个程序中执行,program1它使用. 由于它没有任何明确的异步边界,如果碰巧更改了调度程序(它确实如此),其余的将使用由. 出于这个原因,调用在调度程序上执行。subsubTaskflatMapsubTaskouterScopesubTasktaskEval("outerScopeAfter")io2

试图通过在ped (ie )之后outerScopeTryProtect引入异步边界 (using) 来保护它使用的调度程序。但是,异步边界 ( ) 将调度程序重置为默认调度程序,在这种情况下,默认调度程序将一直返回到. 这不是我们想要的,因为我们想回到调用时使用的调度程序,即调度程序。Task.shiftflatMapsubsubTaskTask.shiftprogram2.runAsynctaskEval("outerScopeBefore")io1

我正在寻找的是类似这样的东西Task[A].flatMap[B](f: A => Task[B]): Task[B],它将f以任何方式执行f指定的任务(可能使用不同的调度程序),但调用的结果TaskflatMap返回Task[A]flatMap.

4

0 回答 0