3

我正在尝试理解 Monix 中的任务调度原则。以下代码(来源:https ://slides.com/avasil/fp-concurrency-scalamatsuri2019#/4/3 )只产生'1',正如预期的那样。

  val s1: Scheduler = Scheduler(
    ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()),
    ExecutionModel.SynchronousExecution)

  def repeat(id: Int): Task[Unit] =
    Task(println(s"$id ${Thread.currentThread().getName}")) >> repeat(id)

  val prog: Task[(Unit, Unit)] = (repeat(1), repeat(2)).parTupled

  prog.runToFuture(s1)

  // Output:
  // 1 pool-1-thread-1
  // 1 pool-1-thread-1
  // 1 pool-1-thread-1
  // ...

当我们添加Task.sleeprepeat方法中

  def repeat(id: Int): Task[Unit] =
    Task(println(s"$id ${Thread.currentThread().getName}")) >>
      Task.sleep(1.millis) >> repeat(id)

输出变为

// Output
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// ...

这两个任务现在在一个线程上同时执行!很好:) 一些合作让步已经开始了。到底发生了什么?谢谢 :)

编辑:同样的情况发生在Task.shift而不是Task.sleep.

4

2 回答 2

1

我不确定这是否是您正在寻找的答案,但它是这样的:

尽管命名另有说明,Task.sleep但不能与更传统的方法(如Thread.sleep.

Task.sleep实际上并不在线程上运行,而是简单地指示调度程序在经过的时间后运行回调。

monix/TaskSleep.scala这是一个用于比较的小代码片段:

[...]

implicit val s = ctx.scheduler
val c = TaskConnectionRef()
ctx.connection.push(c.cancel)

c := ctx.scheduler.scheduleOnce(
  timespan.length,
  timespan.unit,
  new SleepRunnable(ctx, cb)
)

[...]

private final class SleepRunnable(ctx: Context, cb: Callback[Throwable, Unit]) extends Runnable {

  def run(): Unit = {
    ctx.connection.pop()
    // We had an async boundary, as we must reset the frame
    ctx.frameRef.reset()
    cb.onSuccess(())
  }
}

[...]

在执行回调(此处:)之前的期间cb,您的单线程调度程序(此处:)ctx.scheduler可以简单地使用他的线程进行接下来排队的任何计算。

这也解释了为什么这种方法更可取,因为我们不会在睡眠间隔期间阻塞线程——浪费更少的计算周期。

希望这可以帮助。

于 2019-09-10T12:16:04.907 回答
0

扩展马库斯的答案。

作为一个心智模型(用于说明目的),您可以将线程池​​想象成一个堆栈。由于您只有一个执行程序线程池,它会尝试先运行repeat1,然后再运行repeat2.

在内部,一切都只是一个巨人FlatMap。运行循环将根据执行模型调度所有任务。

发生的事情是,sleep将一个可运行的对象调度到线程池。它将 runnable ( ) 推repeat1到堆栈的顶部,因此有机会repeat2运行。同样的事情也会发生repeat2

请注意,默认情况下,Monix 的执行模型将为每 1024 个平面图执行一个异步边界。

于 2019-11-16T06:29:07.270 回答