我有一个 akka 流流,它向端点发出 get http 请求,并在 minikube 环境中的 kubernetes 上运行,因此在执行过程中失败。它采用源中元素的 id 列表。
日志:
07:54:26.943 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForResponseEntitySubscription)]Before event [onResponseEntitySubscribed] In state [WaitingForResponseEntitySubscription] for [152
03 ms]
07:54:26.943 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]After event [onResponseEntitySubscribed] State change [WaitingForResponseEntitySubscription] -> [WaitingF
orEndOfResponseEntity]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]Before event [onResponseEntityFailed] In state [WaitingForEndOfResponseEntity] for [1 ms]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]Response entity for request [GET /api/v2/space/4292177/folder Empty] failed with [null]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (Failed)]After event [onResponseEntityFailed] State change [WaitingForEndOfResponseEntity] -> [Failed]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (Failed)]State change from [WaitingForEndOfResponseEntity] to [Failed(akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$)]
. Closing the existing connection.
07:54:26.949 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (Unconnected)]Slot became idle... Trying to pull
07:54:10.937 [Extractor-akka.actor.default-dispatcher-6] DEBUG akka.actor.ActorSystemImpl - Outgoing request stream error akka.stream.SubscriptionWithCancelException$StageWasCompleted$
07:54:11.538 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [8 (WaitingForResponseEntitySubscription)]Connection cancelled
07:54:11.737 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [0 (WaitingForResponseEntitySubscription)]Connection failed
07:54:11.737 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [0 (WaitingForResponseEntitySubscription)]Before event [onConnectionFailed] In state [WaitingForResponseEntitySubscription] for [4895 ms]
07:54:11.737 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [0 (WaitingForResponseEntitySubscription)]After event [onConnectionFailed] State change [WaitingForResponseEntitySubscription] -> [WaitingFo
rResponseEntitySubscription]
07:54:11.737 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [3 (WaitingForResponseEntitySubscription)]Connection failed
07:54:11.737 [Extractor-akka.actor.default-dispatcher-11] DEBUG akka.http.impl.engine.client.PoolId - [3 (WaitingForResponseEntitySubscription)]Before event [onConnectionFailed] In state [WaitingForResponseEntitySubscription] for [4892 ms]
07:54:11.738 [Extractor-akka.actor.default-dispatcher-25] DEBUG akka.http.impl.engine.client.PoolId - [3 (WaitingForResponseEntitySubscription)]After event [onConnectionFailed] State change [WaitingForResponseEntitySubscription] -> [WaitingFo
rResponseEntitySubscription]
07:54:11.739 [Extractor-akka.actor.default-dispatcher-25] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForResponseEntitySubscription)]Connection failed
07:54:11.739 [Extractor-akka.actor.default-dispatcher-25] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForResponseEntitySubscription)]Before event [onConnectionFailed] In state [WaitingForResponseEntitySubscription] for [4902 ms]
07:54:11.739 [Extractor-akka.actor.default-dispatcher-25] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForResponseEntitySubscription)]After event [onConnectionFailed] State change [WaitingForResponseEntitySubscription] -> [WaitingFo
rResponseEntitySubscription]
7:54:26.943 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]After event [onResponseEntitySubscribed] State change [WaitingForResponseEntitySubscription] -> [WaitingF
orEndOfResponseEntity]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]Before event [onResponseEntityFailed] In state [WaitingForEndOfResponseEntity] for [1 ms]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (WaitingForEndOfResponseEntity)]Response entity for request [GET /api/v2/space/4292177/folder Empty] failed with [null]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (Failed)]After event [onResponseEntityFailed] State change [WaitingForEndOfResponseEntity] -> [Failed]
07:54:26.944 [Extractor-akka.actor.default-dispatcher-42] DEBUG akka.http.impl.engine.client.PoolId - [7 (Failed)]State change from [WaitingForEndOfResponseEntity] to [Failed(akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$)]
. Closing the existing connection.
阿卡代码:
val connection: Flow[(HttpRequest, String), (Try[HttpResponse], String), Http.HostConnectionPool] =
Http().cachedHostConnectionPoolHttps[String]("api.private.com")
val createHttpRequest: String => HttpRequest = (id: String) => HttpRequest(uri = s"/api/v2/space/${id}/")
.withHeaders(RawHeader(AUTH, config.token))
def extractFolders(ref: ActorRef, spaceId:String): fs2.Stream[IO, String] = {
import scala.concurrent.duration._
import akka.util.Timeout
implicit val duration: Timeout = 600.seconds
Source(spaceId.trim.split(','))
.map(i => (createHttpRequest(i), i))
.via(connection)
.map {
x =>
val httpResponse: HttpResponse = x._1.get
Unmarshal(httpResponse).to[FoldersModel]
}
.ask[String](ref)
.filter(i => i.nonEmpty)
.reduce((a,b) => a+","+b)
.toStream[IO]
}
应用程序.conf:
include "akka-http-version"
akka {
# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
# to STDOUT)
loggers = ["akka.event.slf4j.Slf4jLogger"]
# Log level used by the configured loggers (see "loggers") as soon
# as they have been started; before that, see "stdout-loglevel"
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "DEBUG"
# Log level for the very basic logger activated during ActorSystem startup.
# This logger prints the log messages to stdout (System.out).
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = "DEBUG"
logger-startup-timeout = 40s
# Filter of log events that is used by the LoggingAdapter before
# publishing log events to the eventStream.
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
http {
idle-timeout = infinite
client {
# The default value of the `User-Agent` header to produce if no
# explicit `User-Agent`-header was included in a request.
# If this value is the empty string and no header was included in
# the request, no `User-Agent` header will be rendered at all.
user-agent-header = akka-http/${akka.http.version}
# The time period within which the TCP connecting process must be completed.
connecting-timeout = 60s
# The time after which an idle connection will be automatically closed.
# Set to `infinite` to completely disable idle timeouts.
idle-timeout = infinite
# The initial size of the buffer to render the request headers in.
# Can be used for fine-tuning request rendering performance but probably
# doesn't have to be fiddled with in most applications.
request-header-size-hint = 512
# Enables/disables the logging of unencrypted HTTP traffic to and from the HTTP
# client for debugging reasons.
#
# Note: Use with care. Logging of unencrypted data traffic may expose secret data.
#
# Incoming and outgoing traffic will be logged in hexdump format. To enable logging,
# specify the number of bytes to log per chunk of data (the actual chunking depends
# on implementation details and networking conditions and should be treated as
# arbitrary).
#
# For logging on the server side, see akka.http.server.log-unencrypted-network-bytes.
#
# `off` : no log messages are produced
# Int : determines how many bytes should be logged per data chunk
log-unencrypted-network-bytes = off
}
host-connection-pool {
# The maximum number of parallel connections that a connection pool to a
# single host endpoint is allowed to establish. Must be greater than zero.
max-connections = 100
# The minimum number of parallel connections that a pool should keep alive ("hot").
# If the number of connections is falling below the given threshold, new ones are being spawned.
# You can use this setting to build a hot pool of "always on" connections.
# Default is 0, meaning there might be no active connection at given moment.
# Keep in mind that `min-connections` should be smaller than `max-connections` or equal
min-connections = 0
# The maximum number of times failed requests are attempted again,
# (if the request can be safely retried) before giving up and returning an error.
# Set to zero to completely disable request retries.
max-retries = 9
# The maximum number of open requests accepted into the pool across all
# materializations of any of its client flows.
# Protects against (accidentally) overloading a single pool with too many client flow materializations.
# Note that with N concurrent materializations the max number of open request in the pool
# will never exceed N * max-connections * pipelining-limit.
# Must be a power of 2 and > 0!
max-open-requests = 1024
# The maximum number of requests that are dispatched to the target host in
# batch-mode across a single connection (HTTP pipelining).
# A setting of 1 disables HTTP pipelining, since only one request per
# connection can be "in flight" at any time.
# Set to higher values to enable HTTP pipelining.
# This value must be > 0.
# (Note that, independently of this setting, pipelining will never be done
# on a connection that still has a non-idempotent request in flight.
#
# Before increasing this value, make sure you understand the effects of head-of-line blocking.
# Using a connection pool, a request may be issued on a connection where a previous
# long-running request hasn't finished yet. The response to the pipelined requests may then be stuck
# behind the response of the long-running previous requests on the server. This may introduce an
# unwanted "coupling" of run time between otherwise unrelated requests.
#
# See http://tools.ietf.org/html/rfc7230#section-6.3.2 for more info.)
pipelining-limit = 100
# The time after which an idle connection pool (without pending requests)
# will automatically terminate itself. Set to `infinite` to completely disable idle timeouts.
idle-timeout = infinite
# The pool implementation to use. Currently supported are:
# - legacy: the original, still default, pool implementation
# - new: the new still-evolving pool implementation, that will receive fixes and new features
pool-implementation = legacy
# The "new" pool implementation will fail a connection early and clear the slot if a response entity was not
# subscribed during the given time period after the response was dispatched. In busy systems the timeout might be
# too tight if a response is not picked up quick enough after it was dispatched by the pool.
response-entity-subscription-timeout = 60.seconds
# Modify this section to tweak client settings only for host connection pools APIs like `Http().superPool` or
# `Http().singleRequest`.
base-connection-backoff = 0s
}
}
}
该应用程序仅在 minikube 上的 kubernetes 中失败,我不知道是什么原因,我认为它与传出连接有关,但总的来说我没有找到错误的根本原因。这种情况的解决方案是什么或有什么帮助?