我是 cat-effect 库的新手,我遇到了并行执行的问题。我有一个我认为会受益的应用程序,但是当我在玩具构造上测试这个想法时,我似乎看不到执行时间的差异。我觉得我一定错过了一些对其他人来说很明显的东西,所以我想我会试试运气。在下面的代码中,我有两个数字序列(addInSequence
和addInParallel
)求和的实现,两者都在run()
函数中执行。当我运行程序时,我注意到它们的运行时间几乎相同。我错过了什么明显的东西吗?
import cats.Monoid
import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import scala.concurrent.duration.{FiniteDuration, TimeUnit}
case class Result[A](value: A, secondsElapsed: Double)
object Result {
def total[A](results: Seq[Result[A]])(implicit mon: Monoid[A]): Result[A] = {
val out: Result[A] = results.foldLeft(Result.empty[A]) {
(out: Result[A], next: Result[A]) =>
val newValue: A = mon.combine(out.value, next.value)
val aggTime: Double = out.secondsElapsed + next.secondsElapsed
Result(newValue, aggTime)
}
out
}
def empty[A](implicit mon: Monoid[A]): Result[A] = Result(mon.empty, 0d)
implicit val intAddMon: Monoid[Int] = new Monoid[Int] {
override def empty: Int = 0
override def combine(x: Int, y: Int): Int = x + y
}
}
object ParallelMap extends IOApp {
def slowAdd(nums: Seq[Int]): Int = nums.foldLeft(0) {
(out: Int, next: Int) =>
val seconds: TimeUnit = java.util.concurrent.TimeUnit.SECONDS
val delay: IO[Unit] = IO.sleep(FiniteDuration(1L, seconds))
delay.unsafeRunSync()
out + next
}
def timeIt[A](op: => A): Result[A] = {
val start: Double = System.nanoTime / 1e9
val out: A = op
val stop: Double = System.nanoTime / 1e9
Result(out, stop - start)
}
def addInSequence(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
val partialSums: Seq[Result[Int]] = Seq(first, second, third).map( ns => timeIt(slowAdd(ns)) )
IO(Result.total(partialSums))
}
def addInParallel(first: Seq[Int], second: Seq[Int], third: Seq[Int]): IO[Result[Int]] = {
val ioSeq: List[IO[Result[Int]]] = List(first, second, third).map( ns => IO(timeIt(slowAdd(ns))) )
val sums: IO[List[Result[Int]]] = ioSeq.parSequence
for {
partialSums <- sums
} yield Result.total(partialSums)
}
override def run(args: List[String]): IO[ExitCode] = {
val nums: Seq[Int] = 1 to 4
val results: IO[Seq[(String, Result[Int])]] = for {
serial <- addInSequence(nums, nums, nums)
parallel <- addInParallel(nums, nums, nums)
} yield Seq(("Serial", serial), ("Parallel", parallel))
val report: IO[Unit] = results.map(println)
report.unsafeRunSync()
IO(ExitCode.Success)
}
}
看起来我应该看到运行时间减少到三分之一,但我必须以某种方式限制并行执行的能力。但是,文档似乎没有建议需要任何额外的设置,也没有我遇到的任何其他示例。任何想法将不胜感激。