1

我有两个服务:一个发送流数据,第二个使用 akka-grpc 接收它进行通信。提供源数据时,调用服务一进行处理,并通过 grpc 客户端将其发送给服务二。当同时提供多个源数据时,服务器 1 的多个实例可能同时运行。在我的应用程序的长时间运行测试中。我在服务一中看到以下错误:

ERROR i.a.g.application.actors.DbActor - GraphStage [akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1@59d40805] terminated abruptly, caused by for example materializer or act
  akka.stream.AbruptStageTerminationException: GraphStage [akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1@59d40805] terminated abruptly, caused by for example materializer or actor system termination.

我从来没有关闭演员系统,只是在完成工作后才杀死演员。我也使用proto3andhttp2来请求绑定。这是我在服务中的一段代码:

////////////////////server http binding /////////
 val service: HttpRequest => Future[HttpResponse] =
  ServiceOneServiceHandler(new ServiceOneServiceImpl(system))

val bound = Http().bindAndHandleAsync(
  service,
  interface = config.getString("akka.grpc.server.interface"),
  port = config.getString("akka.grpc.server.default-http-port").toInt,
  connectionContext = HttpConnectionContext(http2 = Always))

bound.foreach { binding =>
  logger.info(s"gRPC server bound to: ${binding.localAddress}")
}

////////////////////client /////////
def send2Server[A](data: ListBuffer[A]): Future[ResponseDTO] = {
val reply = {

      val thisClient = interface.initialize()
      interface.call(client = thisClient, req = data.asInstanceOf[ListBuffer[StoreRequest]].toList)

  }
  reply
}

///////////////// grpc communication //////////
def send2GrpcServer[A](data: ListBuffer[A]): Unit = {
val reply = send2Server(data)
Await.ready(reply, Duration.Inf) onComplete {
  case util.Success(response: ResponseDTO) =>
    logger.info(s"got reply message: ${res.description}")

    //////check response content and stop application if desired result not found in response
    }
  case util.Failure(exp) =>
    //////stop application
    throw exp.getCause
}

}

等待服务 2 响应后发生错误:

Await.ready(reply, Duration.Inf)

我无法找到错误的原因。

更新

我发现丢失了一些流,以至于服务一发送一个流无限期地等待响应,而服务二没有收到任何东西来回复服务一但仍然不知道为什么丢失了流我也更新了akka grpc插件但是没有意义:

addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.6.1")

addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4") 
4

0 回答 0