1

我有一个下一个任务,有 N db 查询(例如 3 - Seq(10,20,30))和迭代计数 = 4。我想使用 ZIO 和下一个:顺序执行迭代和内部迭代并行评估效果。

简化的代码如下所示

import zio._
import zio.console._

case class Res(iterNum :Int, dataValue :Int)

val execUnpure :(Int,Int) => Res = (iterNum,dataValue) =>
 Res(iterNum, dataValue)

val exec :(Int,Int) => Task[Res] = (iterNum,dataValue) =>
  Task.succeed(Res(iterNum, dataValue))

val evalEffectsParallel: (Int, Seq[Int]) => Task[Seq[Res]] =
  (iterNum, sqLoadConf) =>
    ZIO.collectAllPar(
      sqLoadConf.map(lc => 
        for {
          //here I open NEW db session for this exec.
          tr: Res <- exec(iterNum, lc)
        } yield tr

      )
    )

val seqParallelExec: (Int, Seq[Int]) => Task[Seq[Res]] =
  (iterNum, sqLoadConf) =>
    for {
      sqTestResults: List[Seq[Res]] <-
      IO.collectAll(
        (1 to iterNum).map(thisIter => evalEffectsParallel(thisIter, sqLoadConf))
      )
      r <- Task(sqTestResults.flatten)
    } yield r

val program : Int => Task[Seq[Res]] = iterationCount => 
      for {
            res <- seqParallelExec(iterationCount, Seq(10,20,30))
          } yield res

(new zio.DefaultRuntime {}).unsafeRun(
  for {
    sp <- program(4)
    _  <- putStrLn(s"seqpar = ${sp.toString}")
  } yield ()
)

此代码有效并返回 seqpar = List(Res(1,10), Res(1,20), Res(1,30), Res(2,10), Res(2,20), Res(2,30), Res(3,10), Res(3,20), Res(3,30))

但是每个效果(exec : Task[Res])都是并行执行的。

看起来 IO.collectAll 或 sqTestResults.flatten 在这里不合适。

在真正的代码中,在“这里我为这个执行者打开新的数据库会话”的地方。我打开到 Postgres 的新 jdbc 连接。当我运行此代码并从 pg_stat_activity 监视活动会话时,我看到该应用程序产生了很多会话,计数等于 4*3=12。

但我的期望是在单次迭代中看到大约 3 个会话,下一次会有更多新的 3 个会话和新的 3 个会话。

Adam Fraser 的解决方案按预期工作,但我为我简化了它。

  private val execute: (Int, PgConnectProp, PgLoadConf) => Task[PgTestResult] =
    (iterNum, dbConProps, lc) =>
      (new PgConnection).sess(iterNum,dbConProps).flatMap(thisSess => PgTestExecuter.exec(iterNum, thisSess, lc))

  private val executeSession :(Int, PgConnectProp,  Seq[PgLoadConf]) => Task[Seq[PgTestResult]] =
    (iteration, dbConProps, sqLoadConf) =>
      Task.foreachPar(sqLoadConf)(lc => execute(iteration, dbConProps, lc))

  private val seqparExec: (PgRunProp, PgConnectProp, Seq[PgLoadConf]) => Task[Seq[PgTestResult]] =
    (runProperties, dbConProps, sqLoadConf) =>
      Task.foreach(List.range(1, runProperties.repeat + 1)) {
        iteration => executeSession(iteration, dbConProps, sqLoadConf)
      }
        .map(_.flatten)

和 3 次迭代的结果。

 iter : [1] test : [1] pid=[13505] startTs [1575955678690] endTs [1575955678756] 
 iter : [1] test : [4] pid=[13510] startTs [1575955678691] endTs [1575955678883] 
 iter : [1] test : [6] pid=[13508] startTs [1575955678697] endTs [1575955678965] 
 iter : [1] test : [5] pid=[13507] startTs [1575955678697] endTs [1575955679106] 
 iter : [1] test : [2] pid=[13506] startTs [1575955678693] endTs [1575955679208] 
 iter : [1] test : [3] pid=[13509] startTs [1575955678697] endTs [1575955680438] 

iter : [2] test : [1] pid=[13514] startTs [1575955680653] endTs [1575955681291] 
 iter : [2] test : [4] pid=[13517] startTs [1575955681071] endTs [1575955681367] 
 iter : [2] test : [5] pid=[13518] startTs [1575955681071] endTs [1575955681396] 
 iter : [2] test : [6] pid=[13519] startTs [1575955681276] endTs [1575955681484] 
 iter : [2] test : [2] pid=[13515] startTs [1575955681071] endTs [1575955681645] 
 iter : [2] test : [3] pid=[13516] startTs [1575955681106] endTs [1575955682712] 

iter : [3] test : [1] pid=[13521] startTs [1575955682830] endTs [1575955682903] 
 iter : [3] test : [4] pid=[13524] startTs [1575955682819] endTs [1575955682938] 
 iter : [3] test : [6] pid=[13526] startTs [1575955682832] endTs [1575955683137] 
 iter : [3] test : [5] pid=[13525] startTs [1575955682863] endTs [1575955683428] 
 iter : [3] test : [2] pid=[13522] startTs [1575955682816] endTs [1575955683476] 
 iter : [3] test : [3] pid=[13523] startTs [1575955682902] endTs [1575955684056] 

此迭代的每个第一个会话在上一次迭代的最后一个 endTs 之后开始。但是 postgres 会话需要一点时间来关闭它可以理解全局并行执行。但实际上它是迭代的顺序和内部的并行。

谢谢。

4

1 回答 1

6

您需要展平结果列表而不是效果。此外,您应该考虑bracketManaged处理打开和关闭数据库连接,这样您就不必自己担心。我在下面提供了一个示例。

import zio._
import zio.console._

object Example extends App {

  final case class Result(iteration: Int, value: Int)

  def execute(iteration: Int, value: Int): URIO[Console, Result] =
    console.putStrLn(s"Executing iteration $iteration, value $value").as(Result(iteration, value))

  def executeSession(iteration: Int, values: List[Int]): URIO[Console, List[Result]] =
    URIO.bracket[Console, Unit, List[Result]](
      console.putStrLn(s"Opening database connection $iteration"),
      _ => console.putStrLn(s"Closing database connection $iteration"),
      _ => URIO.foreachPar(values)(value => execute(iteration, value))
    )

  def executeSessions(iterations: Int, values: List[Int]): URIO[Console, List[Result]] =
    URIO
      .foreach(List.range(1, iterations + 1)) { iteration =>
        executeSession(iteration, values)
      }
      .map(_.flatten)

  def program(n: Int): URIO[Console, List[Result]] =
    executeSessions(n, List(10, 20, 30))

  def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
    program(4).as(0)
}
于 2019-12-05T14:59:04.667 回答