0

我在lagom下面有一个主题订阅者

fooService.fooTopic().subscribe
  .atLeastOnce(
    Flow[fooMsg].map {
      case fooMsg(_) =>
        foo()
      case a =>
        println(a)
    }.async.map{ _ =>
      Done
    }
  )

订阅这个主题,我使用atLeastOnce作为方法,所以如果有任何异常,我希望流程重新启动/重试。当我抛出一个正常的异常时,它可以继续正常重试

  private def foo() = {
    throw new RuntimeException("testing error")
  }

但是当将来发生异常时,无论我如何尝试,Flow都不会重新启动。这是我将来处理异常的尝试之一

  private def foo() = {
    val test: Future[Int] = Future(throw new RuntimeException("asd"))
    val result = for {
      y1 <- test
    } yield (y1)

    result.onComplete{
      case Success(value) => println("SUCCESS")
      case Failure(exception) => println(exception.getMessage)
                                 throw exception
    }
  }
  private def foo() = {
    val test: Future[Int] = Future(throw new RuntimeException("asd"))

    test.onComplete{
      case Success(value) => println("SUCCESS")
      case Failure(exception) => println(exception.getMessage)
                                 throw exception
    }
  }

它会显示一个异常,但Flow不会自动重新启动它。我应该如何处理/抛出异常Future

4

1 回答 1

1

如果你只失败了一个未来,我认为你不需要重新开始全流程。我建议只重试Future。例如,您可以编写这样的代码来重试您的调用,在您的方法调用上替换 Future.successful(10) :

        val test: Future[Int] = Future(throw new RuntimeException("asd")).recoverWith {
          case NonFatal(e) =>
            Future.successful(10)
        }

        val result = for {
          y1 <- test
        } yield (y1)

此外,您可以根据需要编写代码,它会失败并重试,但您需要返回结果Future

  kafka.topic1.subscribe.atLeastOnce(Flow[String]
    .mapAsync(1) {
      case envelope: String =>

        val test: Future[String] = Future(throw new RuntimeException("asd"))
      /*.recoverWith {
          case NonFatal(e) =>
            Future.successful("10")
        }*/

        val result = for {
          y1 <- test
        } yield (y1)

        println(s"code block $envelope")
       result.onComplete{
          case Success(value) => println(s"Message from topic: $envelope $result")
          case Failure(exception) => println(exception.getMessage)
            throw exception
        }
      result.map(_ => Done)
    }
)
于 2020-07-09T05:45:00.900 回答