2

我正在尝试使用 monix 来并行化某些操作,然后执行错误处理

假设我正在尝试解析和验证几个这样的对象

def parseAndValidateX(x: X) Task[X]

def parseAndValidateY(y: Y): Task[Y]

这里 X 和 Y 是我定义的一些类型。

现在,这些方法中的每一个都会评估一些标准并返回一个任务。如果评估失败,我有一些形式的代码

Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c"))

我对 Y 有类似的任务加薪。

Task.raiseError(new CustomException("Y is invalid because certain reasons a',b',c'"))

现在我有这种类型

case class Combined(x: X, y: Y)

我定义了这个

private def parseAndValidateCombined(x: X, y: Y): Task[Combined] = {
  val xTask = parseAndValidateX(x)
  val yTask = parseAndValidateY(y)
     
  Task.parMap2(xTask, yTask) {
    (xEval, yEval) => SecretData(xEval, yTask)
  }
}

这应该允许我并行运行验证,果然我得到了响应。

但是我也想要这种行为

如果两个任务都失败,我想返回这样的错误

Task.raiseError(new CustomException("X is invalid because certain reasons a,b,c and "Y is invalid because certain reasons a',b',c'"))

我似乎无法做到这一点。根据两个任务中的哪一个失败,我只能在 parMap2 输出的 onRecover 方法上获得两个失败之一。

如果两者都失败,我只会收到任务 X 的错误。

我是否有可能以完全异步的方式完成我正在使用 Monix 所做的事情(例如,可能是其他一些将任务组合在一起的方法)?还是我必须阻止执行程序,单独获取错误并重新组合值?

4

1 回答 1

2

只有,parMap2不可能完成你想做的事情。文档说:

如果其中一个任务失败,那么所有其他任务都会被取消,最终结果将是失败。

但是,可以实现希望您想要暴露错误,而不是隐藏在单子处理之后。这可以通过materialize方法实现。

因此,例如,您可以将方法实现为:

private def parseAndValidateCombined[X, Y](x: X, y: Y): Task[Combined] = {
  val xTask = parseAndValidate(x).materialize // turn on Task[Try[X]]
  val yTask = parseAndValidate(y).materialize // turn on Task[Try[Y]]
     
  Task.parMap2(xTask, yTask) {
    case (Success(xEval), Success(yEval)) => Success(SecretData(xEval, yEval))
    case (Failure(_), Failure(_)) => Failure[Combined](new CustomException(....))
    case (Failure(left), _) => Failure[Combined](left)
    case (_, Failure(right)) => Failure[Combined](right)
  }.dematerialize // turn to Task[Combined]
}
于 2021-07-27T09:02:26.540 回答