0

我有一个 akka 流流,它向端点发出 get http 请求,并在 minikube 环境中的 kubernetes 上运行,因此在执行过程中失败。它采用源中元素的 id 列表。

日志:

07:54:26.943 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForResponseEntitySubscription)]Before event [onResponseEntitySubscribed] In state [WaitingForResponseEntitySubscription] for [152
03 ms]
07:54:26.943 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]After event [onResponseEntitySubscribed] State change [WaitingForResponseEntitySubscription] -> [WaitingF
orEndOfResponseEntity]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]Before event [onResponseEntityFailed] In state [WaitingForEndOfResponseEntity] for [1 ms]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]Response entity for request [GET /api/v2/space/4292177/folder Empty] failed with [null]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (Failed)]After event [onResponseEntityFailed] State change [WaitingForEndOfResponseEntity] -> [Failed]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (Failed)]State change from [WaitingForEndOfResponseEntity] to [Failed(akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$)]
. Closing the existing connection.
07:54:26.949 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (Unconnected)]Slot became idle... Trying to pull
07:54:10.937 [Extractor-akka.actor.default-dispatcher-6] DEBUG akka.actor.ActorSystemImpl - Outgoing request stream error akka.stream.SubscriptionWithCancelException$StageWasCompleted$
07:54:11.538 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [8 (WaitingForResponseEntitySubscription)]Connection cancelled
07:54:11.737 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [0 (WaitingForResponseEntitySubscription)]Connection failed
07:54:11.737 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [0 (WaitingForResponseEntitySubscription)]Before event [onConnectionFailed] In state [WaitingForResponseEntitySubscription] for [4895 ms]
07:54:11.737 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [0 (WaitingForResponseEntitySubscription)]After event [onConnectionFailed] State change [WaitingForResponseEntitySubscription] -> [WaitingFo
rResponseEntitySubscription]
07:54:11.737 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [3 (WaitingForResponseEntitySubscription)]Connection failed
07:54:11.737 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [3 (WaitingForResponseEntitySubscription)]Before event [onConnectionFailed] In state [WaitingForResponseEntitySubscription] for [4892 ms]
07:54:11.738 [Extractor-akka.actor.default-dispatcher-25] DEBUG akka.http.impl.engine.client.PoolId - [3 (WaitingForResponseEntitySubscription)]After event [onConnectionFailed] State change [WaitingForResponseEntitySubscription] -> [WaitingFo
rResponseEntitySubscription]
07:54:11.739 [Extractor-akka.actor.default-dispatcher-25] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForResponseEntitySubscription)]Connection failed
07:54:11.739 [Extractor-akka.actor.default-dispatcher-25] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForResponseEntitySubscription)]Before event [onConnectionFailed] In state [WaitingForResponseEntitySubscription] for [4902 ms]
07:54:11.739 [Extractor-akka.actor.default-dispatcher-25] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForResponseEntitySubscription)]After event [onConnectionFailed] State change [WaitingForResponseEntitySubscription] -> [WaitingFo
rResponseEntitySubscription]
7:54:26.943 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]After event [onResponseEntitySubscribed] State change [WaitingForResponseEntitySubscription] -> [WaitingF
orEndOfResponseEntity]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]Before event [onResponseEntityFailed] In state [WaitingForEndOfResponseEntity] for [1 ms]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]Response entity for request [GET /api/v2/space/4292177/folder Empty] failed with [null]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (Failed)]After event [onResponseEntityFailed] State change [WaitingForEndOfResponseEntity] -> [Failed]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (Failed)]State change from [WaitingForEndOfResponseEntity] to [Failed(akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$)]
. Closing the existing connection.

阿卡代码:

 val connection: Flow[(HttpRequest, String), (Try[HttpResponse], String), Http.HostConnectionPool] =
    Http().cachedHostConnectionPoolHttps[String]("api.private.com")

  val createHttpRequest: String => HttpRequest = (id: String) => HttpRequest(uri = s"/api/v2/space/${id}/")
    .withHeaders(RawHeader(AUTH, config.token))


  def extractFolders(ref: ActorRef, spaceId:String): fs2.Stream[IO, String] = {
    import scala.concurrent.duration._
    import akka.util.Timeout
    implicit val duration: Timeout = 600.seconds

    Source(spaceId.trim.split(','))
      .map(i => (createHttpRequest(i), i))
      .via(connection)
      .map {
        x =>
          val httpResponse: HttpResponse = x._1.get
          Unmarshal(httpResponse).to[FoldersModel]
      }
      .ask[String](ref)
      .filter(i => i.nonEmpty)
      .reduce((a,b) => a+","+b)
      .toStream[IO]
  }

应用程序.conf:

include "akka-http-version"
akka {

  # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
  # to STDOUT)
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # Log level used by the configured loggers (see "loggers") as soon
  # as they have been started; before that, see "stdout-loglevel"
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  loglevel = "DEBUG"

  # Log level for the very basic logger activated during ActorSystem startup.
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  stdout-loglevel = "DEBUG"

  logger-startup-timeout = 40s

  # Filter of log events that is used by the LoggingAdapter before
  # publishing log events to the eventStream.
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"



  http {


    idle-timeout =  infinite

    client {
      # The default value of the `User-Agent` header to produce if no
      # explicit `User-Agent`-header was included in a request.
      # If this value is the empty string and no header was included in
      # the request, no `User-Agent` header will be rendered at all.
      user-agent-header = akka-http/${akka.http.version}

      # The time period within which the TCP connecting process must be completed.
      connecting-timeout = 60s

      # The time after which an idle connection will be automatically closed.
      # Set to `infinite` to completely disable idle timeouts.
      idle-timeout = infinite

      # The initial size of the buffer to render the request headers in.
      # Can be used for fine-tuning request rendering performance but probably
      # doesn't have to be fiddled with in most applications.
      request-header-size-hint = 512



      # Enables/disables the logging of unencrypted HTTP traffic to and from the HTTP
      # client for debugging reasons.
      #
      # Note: Use with care. Logging of unencrypted data traffic may expose secret data.
      #
      # Incoming and outgoing traffic will be logged in hexdump format. To enable logging,
      # specify the number of bytes to log per chunk of data (the actual chunking depends
      # on implementation details and networking conditions and should be treated as
      # arbitrary).
      #
      # For logging on the server side, see akka.http.server.log-unencrypted-network-bytes.
      #
      # `off` : no log messages are produced
      # Int   : determines how many bytes should be logged per data chunk
      log-unencrypted-network-bytes = off
    }

    host-connection-pool {
      # The maximum number of parallel connections that a connection pool to a
      # single host endpoint is allowed to establish. Must be greater than zero.
      max-connections = 100

      # The minimum number of parallel connections that a pool should keep alive ("hot").
      # If the number of connections is falling below the given threshold, new ones are being spawned.
      # You can use this setting to build a hot pool of "always on" connections.
      # Default is 0, meaning there might be no active connection at given moment.
      # Keep in mind that `min-connections` should be smaller than `max-connections` or equal
      min-connections = 0

      # The maximum number of times failed requests are attempted again,
      # (if the request can be safely retried) before giving up and returning an error.
      # Set to zero to completely disable request retries.
      max-retries = 9

      # The maximum number of open requests accepted into the pool across all
      # materializations of any of its client flows.
      # Protects against (accidentally) overloading a single pool with too many client flow materializations.
      # Note that with N concurrent materializations the max number of open request in the pool
      # will never exceed N * max-connections * pipelining-limit.
      # Must be a power of 2 and > 0!
      max-open-requests = 1024

      # The maximum number of requests that are dispatched to the target host in
      # batch-mode across a single connection (HTTP pipelining).
      # A setting of 1 disables HTTP pipelining, since only one request per
      # connection can be "in flight" at any time.
      # Set to higher values to enable HTTP pipelining.
      # This value must be > 0.
      # (Note that, independently of this setting, pipelining will never be done
      # on a connection that still has a non-idempotent request in flight.
      #
      # Before increasing this value, make sure you understand the effects of head-of-line blocking.
      # Using a connection pool, a request may be issued on a connection where a previous
      # long-running request hasn't finished yet. The response to the pipelined requests may then be stuck
      # behind the response of the long-running previous requests on the server. This may introduce an
      # unwanted "coupling" of run time between otherwise unrelated requests.
      #
      # See http://tools.ietf.org/html/rfc7230#section-6.3.2 for more info.)
      pipelining-limit = 100

      # The time after which an idle connection pool (without pending requests)
      # will automatically terminate itself. Set to `infinite` to completely disable idle timeouts.
      idle-timeout =  infinite

      # The pool implementation to use. Currently supported are:
      #  - legacy: the original, still default, pool implementation
      #  - new: the new still-evolving pool implementation, that will receive fixes and new features
      pool-implementation = legacy

      # The "new" pool implementation will fail a connection early and clear the slot if a response entity was not
      # subscribed during the given time period after the response was dispatched. In busy systems the timeout might be
      # too tight if a response is not picked up quick enough after it was dispatched by the pool.
      response-entity-subscription-timeout = 60.seconds

      # Modify this section to tweak client settings only for host connection pools APIs like `Http().superPool` or
      # `Http().singleRequest`.
      base-connection-backoff = 0s

    }
  }
}

该应用程序仅在 minikube 上的 kubernetes 中失败,我不知道是什么原因,我认为它与传出连接有关,但总的来说我没有找到错误的根本原因。这种情况的解决方案是什么或有什么帮助?

4

0 回答 0