0

我正在尝试将 MonixTaskmongo-scala-driver. 我有点难以理解Error Handling

    val mongoClient: Resource[Task, MongoConnection[Task, DomainModel]] =
      MongoTypedConnection.create[Task, DomainModel](
        "mongodb:...&authMechanism=SCRAM-SHA-1"
      )

    mongoClient.use { client =>
      val changeStream: Task[ChangeStreamObservable[DomainModel]] =
        for {
          collection <- client.getMongoCollection("myDatabase", "myCollection")
          changes    <- client.watchCollection(collection)
        } yield changes
        ...
        ...
        ...
        .as(ExitCode.Success)
      }

当没有错误时,这非常有效。我想为此添加错误处理(例如处理不正确databasecollection名称)。我基于文档的初步尝试是尝试:

      val changeObs: io.Serializable =
        Await.result(changeStream
          .onErrorHandleWith {
            case _: TimeoutException =>
              // Oh, we know about timeouts, recover it
              Task.now("Recovered!")
            case other =>
              // We have no idea what happened, raise error!
              Task.raiseError(other)
          }.runToFuture, 5.seconds)

但这给了我一个io.Serializable. 我如何保留一段ChangeStreamObservable[DomainModel]时间同时进行某种简洁的错误处理?感谢我可以研究的任何模式的指针。

BR

4

1 回答 1

0

原来我看错了。

Task[ChangeStreamObservable[DomainModel]]已经有一个MonadError。对于像我这样的菜鸟来说,这本质上意味着它不会丢失错误。所以这可以在代码库的最后完成:

      changeStream //Or any other Task/Observable which (is composed)composes (from)this Task
        .onErrorHandle {
          case timeout: MongoTimeoutException =>
            logger.error(timeout.getMessage)
          case illegal: java.lang.IllegalArgumentException =>
            logger.error(illegal.getMessage)
          case unauthorized: com.mongodb.MongoCommandException =>
            logger.error(unauthorized.getMessage)

我试图运行 Task 只是为了处理代码库中间的错误,我认为如果我编写多个 Tasks/Observable,我会丢失最初的错误。

于 2020-06-18T11:05:26.583 回答