9

我使用以下代码将一些数据发布到服务器

  def post(endpoint: String, entity: Strict) = {
    Http().singleRequest(HttpRequest(uri = Notifier.notificationUrl + endpoint, method = HttpMethods.POST,
      entity = entity)) onComplete {
      case Success(response) => response match {
        case HttpResponse(StatusCodes.OK, _, _, _) =>
          log.info("communicated successfully with Server")
      }
      case Failure(response) =>
        log.error("communicated failed with Server: {}", response)
    }
  }

每次演员收到如下消息10 seconds时都会调用它Notifier

case ecMonitorInformation: ECMonitorInformation =>
  post("monitor", httpEntityFromJson(ecMonitorInformation.toJson))

问题?

我看到最初(围绕5发送到服务器的请求),但后来它挂了,我没有看到任何日志记录,服务器没有收到任何数据。在客户端一段时间后,我看到以下内容

ERROR c.s.e.notification.Notifier - communicated failed with Server: java.lang.RuntimeException: Exceeded configured max-open-requests value of [32]

到底是怎么回事?我该如何解决这个问题?

4

4 回答 4

2

您还可以通过增加默认情况下max-open-requests的 akka 属性来克服此错误。32

要更改的属性将是:

akka.http.host-connection-pool.max-open-requests = 64

唯一需要注意的是,当客户端打开的并发连接数超过该参数的新值时,这将失败,在此示例中,如果打开的连接数超过64,您将收到相同的错误。

于 2019-09-12T13:32:06.227 回答
2

如果您要重复调用您的方法,您可能需要考虑使用此处描述的基于连接池的客户端方法之一:http: //doc.akka.io/docs/akka-stream-and-http-experimental /1.0/scala/http/client-side/index.html

您还可以在 akka-http 客户端配置中设置连接池设置:http: //doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/http/configuration.html#akka- http核心

搜索主机连接池。

于 2015-08-26T18:12:50.637 回答
2

您可以使用Source.queue而不是Source.single提供缓冲和溢出策略。在https://stackoverflow.com/a/35115314/1699837上查看更多详细信息

于 2016-02-01T10:49:44.513 回答
2

我浏览了文档并尝试了以下

val connectionFlow: Flow[HttpRequest, HttpResponse, 
        Future[Http.OutgoingConnection]] =
        Http().outgoingConnection(host = "localhost", port = 8080)

接着

  def httpPost(uri: String, httpEntity:Strict) {
    val responseFuture: Future[HttpResponse] =
      Source.single(HttpRequest(uri = "/monitor", method = HttpMethods.POST, entity=httpEntity))
        .via(connectionFlow)
        .runWith(Sink.head)

    responseFuture onComplete {
      case Success(response) => log.info("Communicated with Server: {}", response)
      case Failure(failure) => log.error("Communication failed with Server: {}", failure)
    }

这对我有用

于 2015-08-27T20:54:18.813 回答