0

我最近发现了一个在使用 Monix 时无法完全理解的案例Task

有两个函数(在队列消息处理程序中):

  def handle(msg: RollbackMsg): Task[Unit] = {
    logger.info(s"Attempting to rollback transaction ${msg.lockId}")
    Task.defer(doRollback(msg)).onErrorRestart(5).foreachL { _ =>
      logger.info(s"Transaction ${msg.lockId} rolled back")
    }
  }

  private def doRollback(msg: RollbackMsg): Task[Unit] =
    (for {
      originalLock         <- findOrigLock(msg.lockId)
      existingClearanceOpt <- findExistingClearance(originalLock)
      _                    <- clearLock(originalLock, existingClearanceOpt)
    } yield ()).transact(xa)

的 for-comprehension的内部doRollback都是一组返回 monad 的doobie调用ConnectionIO[_],然后transact在其上运行将组合变成 Monix Task

现在,如handle函数所示,我希望整个过程在失败的情况下重试 5 次。神秘的部分是这个简单的调用:

doRollback(msg).onErrorRestart(5)

并没有真正重新启动异常操作(在测试中验证)。为了获得这种重试行为,我必须明确地将其包装在 中Task.defer,或者Task以任何其他方式将其置于“上下文”中。

这就是我不完全明白的一点:为什么会这样?doRollback已经给了我Task实例,所以我应该可以调用onErrorRestart它,不是吗?如果不是这种情况,我如何确定Task我从“某处”获得的实例是否可以重新启动?

我在这里想念什么?

4

0 回答 0