我正在寻找一种方式Task
(即外部范围)可以使用或等效的方式执行“子任务” flatMap
,并确保外部范围中的任何后续链接调用都使用原始调度程序。
使用的库和scala:
- 斯卡拉 - 2.12.4
- 莫尼克斯 -
"io.monix" %% "monix" % "3.0.0-RC1"
- 猫 -
"org.typelevel" %% "cats-core" % "1.0.1"
示例代码:
import monix.eval.Task
import monix.execution.Scheduler
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import monix.execution.Scheduler.Implicits.global
import cats.implicits._
object Test extends App {
val io1 = Scheduler.io("io1")
val io2 = Scheduler.io("io2")
def taskEval(name: String) = Task.eval(println(s"Running eval Task [$name] on thread [${Thread.currentThread().getName}]"))
def subTask: Task[Unit] = {
taskEval("subTaskScope").executeOn(io2)
}
def outerScope(sub: Task[Unit]): Task[Unit] = {
taskEval("outerScopeBefore") *> sub *> taskEval("outerScopeAfter")
}
def outerScopeTryProtect(sub: Task[Unit]): Task[Unit] = {
taskEval("outerScopeBefore") *> (sub <* Task.shift) *> taskEval("outerScopeAfter")
}
val program1 = taskEval("programBefore").executeOn(io1) *> outerScope(subTask) *> taskEval("programAfter")
val program2 = taskEval("programBefore").executeOn(io1) *> outerScopeTryProtect(subTask) *> taskEval("programAfter")
Await.result(program1.runAsync, Duration.Inf)
// Running eval Task [programBefore] on thread [io1-573]
// Running eval Task [outerScopeBefore] on thread [io1-573]
// Running eval Task [subTaskScope] on thread [io2-574]
// Running eval Task [outerScopeAfter] on thread [io2-574] // << we don't shift back so we are stuck with the scheduler that is forces by subTask
// Running eval Task [programAfter] on thread [io2-574]
println("------")
// Running eval Task [programBefore] on thread [io1-573]
// Running eval Task [outerScopeBefore] on thread [io1-573]
// Running eval Task [subTaskScope] on thread [io2-574]
// Running eval Task [outerScopeAfter] on thread [scala-execution-context-global-575] // we shift the scheduler but this restores the default scheduler
// Running eval Task [programAfter] on thread [scala-execution-context-global-575]
Await.result(program2.runAsync, Duration.Inf)
}
该subTask
方法希望在专用调度程序 ( io2
) 上执行一些异步工作,因此它强制使用调度程序的异步边界executeOn
。
该outerScope
方法正在某个程序中执行,program1
它使用. 由于它没有任何明确的异步边界,如果碰巧更改了调度程序(它确实如此),其余的将使用由. 出于这个原因,调用在调度程序上执行。sub
subTask
flatMap
subTask
outerScope
subTask
taskEval("outerScopeAfter")
io2
试图通过在ped (ie )之后outerScopeTryProtect
引入异步边界 (using) 来保护它使用的调度程序。但是,异步边界 ( ) 将调度程序重置为默认调度程序,在这种情况下,默认调度程序将一直返回到. 这不是我们想要的,因为我们想回到调用时使用的调度程序,即调度程序。Task.shift
flatMap
sub
subTask
Task.shift
program2.runAsync
taskEval("outerScopeBefore")
io1
我正在寻找的是类似这样的东西Task[A].flatMap[B](f: A => Task[B]): Task[B]
,它将f
以任何方式执行f
指定的任务(可能使用不同的调度程序),但调用的结果Task
将flatMap
返回Task[A]
到flatMap
.