6

我知道我可以使用

import zio.Task

def zip3Par[A, B, C](a: Task[A], b: Task[B], c: Task[C]): Task[(A, B, C)] =
  a.zipPar(b).zipWithPar(c) { case ((a, b), c) => (a, b, c) }

def zip4Par[A, B, C, D](a: Task[A], b: Task[B], c: Task[C], d: Task[D]): Task[(A, B, C, D)] =
  zip3Par(a, b, c).zipWithPar(d) { case ((a, b, c), d) => (a, b, c, d) }

并行执行 3 或 4 个任务,但如果有更优雅的解决方案,我会感到困惑吗?

4

3 回答 3

6

您可以只使用ZIO.collectAllPar任务列表:

def collectTasks(tasks: Task[Int]*):Task[List[Int]] = ZIO.collectAllPar(tasks)

然后你可以像这样使用它:

val t1 = Task.effect{
  Thread.sleep(100)
  println("t1 started")
  Thread.sleep(1000)
  1
}

val t2 = Task.effect{
  println("t2 started")
  Thread.sleep(1000)
  2
}


val t3 = Task.effect{
  println("t3 started")
  Thread.sleep(1000)
  3
}

(new DefaultRuntime() {}).unsafeRun(collectTasks(t1,t2,t3))

它将同时运行您的所有任务。

使用元组而不是列表的通用解决方案在没有无形的Scala 2中很难实现。它会在Scala 3中发生变化,因为那时它们可以作为异构列表处理。

于 2019-07-13T09:19:33.103 回答
5

另请注意,有<&>组合器。这是 的别名zipPar。这将产生一个元组,如果您使用 for 理解,我建议您看看better-monadic-for哪个修复了 for 理解中的元组问题

<&>下面是一个将组合器与 map 结合使用的示例:

(t1 <&> t2 <&> t3 <&> t4) map { case i1 <*> i2 <*> i3 <*> i4 => s"$i1, $i2, $i3, $i4" }

ZIO.collectAllPar并且ZIO.collectAllParN仅在所有ZIO返回类型都相同时才有效。那不是问题。

于 2019-07-13T10:46:54.653 回答
3

添加到 Krzysztof Atłasik 的回答中,还有collectAllParN,它的工作方式与 collectAllPAr 类似,但允许您指定要使用的最大光纤数:

 val a = Task {
      println("t1 started")
      Thread.sleep(2000)
      println("t1 finished")
      1
    }
    val b = Task {
      println("t2 started")
      Thread.sleep(1000)
      println("t2 finished")
      2
    }
    val c = Task {
      println("t3 started")
      Thread.sleep(3000)
      println("t3 finished")
      3
    }
    val d = Task {
      println("t4 started")
      Thread.sleep(1000)
      println("t4 finished")
      4
    }

你可以这样运行它:

 Task.collectAllParN(4)(List(a, b, c, d))

如果您有许多(数百或数千)并行任务,这将特别有用,因此您可以避免溢出和内存错误。继续将要使用的光纤数量更改为 2 或 3,并亲自查看执行情况如何变化。

并行执行的另一种选择是将任务放在ZQueue上,并在您的消费者收到它们后分叉它们。

于 2019-07-13T09:43:44.473 回答