0

我正在尝试使用 scala 并行集合。我正在尝试从已设置的本地服务器获取数据,这是我的代码

val httpRequestInputs = List(inputs).par

def getResponse(data: String, url: String) = {
    val request = basicRequest.body(text).post(url)
      .headers(Map("content-type" -> "text/plain", "charset" -> "utf-8"))
      .response(asString)

    implicit val backend
    = HttpURLConnectionBackend(options = SttpBackendOptions.connectionTimeout(5.minutes))
    request.readTimeout(5.minutes).send().body
}



// program executes from here
  httpRequestInputs.foreach { input =>
      val result = getResponse(input, url)
      result match {
          case Right(value) => println(value)
          case Left(value) => println("error")
     }

使用小尺寸输入时,没有问题,但是,当我尝试使用大输入尺寸时,程序抛出SocketException,我检查了服务器,服务器没有错误,在我看来,客户端正在关闭连接早期的。而且,这些大型输入在单独运行时通常需要不到 90 秒的时间来获得响应。

我尝试在 http 请求中扩展连接并读取超时选项,但仍然出现异常。

谁能帮我理解,为什么客户端要关闭连接?

对于http请求,我正在使用客户端com.softwaremill.sttp.client

4

1 回答 1

3

如果“大输入大小”意味着至少有几千个输入,并且每个输入都连接到同一个远程服务器,那么很可能你正在用尽运行它的临时端口范围:基本上有一个限制(变化从操作系统到操作系统)在一定时间内可以与同一远程主机和端口建立的连接数(Windows 文档,但据我所知,每个操作系统都有类似的限制)。

您要么需要捕获异常并重试,要么限制连接尝试,以免耗尽范围。(在极少数情况下,如果您尝试的次数不超过限制,则可能有一个操作系统配置选项可让您增加限制)。

使用 Scala 标准库来限制它的一个好方法是使用Future

import scala.concurrent.{ ExecutionContext, Future }

implicit val executionContext = ExecutionContext.fromExecutor(
  new java.util.concurrent.ForkJoinPool(1000) // Allow 1000 requests to be executing at once
)

val allRequestsFut =
  Future.sequence(
    httpRequestInputs.map { input =>
      Future { getResponse(input, url) }.map {
        _ match {
          case Right(value) => println(value)
          case Left(err) => println(s"error: $err")
        }
      }
    )

allRequestsFut.foreach { _ =>
  println("all requests complete")
}

请注意,许多操作系统(包括 Linux)将在端口关闭后继续保留临时端口一段时间。要动态限制请求,我建议使用 Akka Streams 之类的东西。

于 2020-07-01T19:39:07.777 回答