7

我有一个使用 Akka Server HTTP 处理传入请求的服务(我们称之为服务 A)。我也有提供多种网络服务的第 3 方应用程序(服务 B)。服务 A 的目的是转换客户端请求,调用服务 B 的一个或多个 Web 服务,合并/转换结果并将其返回给客户端。

我将 Actors 用于某些部分,而将 Future 用于其他部分。为了调用服务 B,我使用 Akka HTTP 客户端。

Http.get(actorSystem).singleRequest(HttpRequest.create()
        .withUri("http://127.0.0.1:8082/test"), materializer)
        .onComplete(...)

问题是,每个 Service A 请求都会创建一个新流,如果有多个并发连接,则会导致akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error

我已经问过这个问题并得到了使用单个 Flow如何正确调用 Akka HTTP 客户端来处理多个(10k - 100k)请求的建议?

虽然它适用于来自一个地方的一批请求,但我不知道如何使用来自所有并发请求处理程序的单个 Flow。

正确的“阿卡方式”是什么?

4

3 回答 3

13

我认为您可以Source.queue用来缓冲您的请求。下面的代码假定您需要从 3rd 方服务中获得答案,因此Future[HttpResponse]非常欢迎。这样,您还可以提供溢出策略来防止资源匮乏。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.{Failure, Success}

import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem("main")
implicit val materializer = ActorMaterializer()
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
  .via(pool)
  .toMat(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p)) => p.failure(e)
  }))(Keep.left)
  .run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise

val response = queue.offer(request).flatMap(buffered => {
  if (buffered) promise.future
  else Future.failed(new RuntimeException())
})

Await.ready(response, 3 seconds)

(代码从我的博客文章中复制)

于 2016-01-31T15:05:31.397 回答
4

这是已接受答案的 Java 版本

final Flow<
    Pair<HttpRequest, Promise<HttpResponse>>,
    Pair<Try<HttpResponse>, Promise<HttpResponse>>,
    NotUsed> flow =
    Http.get(actorSystem).superPool(materializer);

final SourceQueue<Pair<HttpRequest, Promise<HttpResponse>>> queue = Source.<Pair<HttpRequest, Promise<HttpResponse>>>
    queue(BUFFER_SIZE, OverflowStrategy.dropNew())
    .via(flow)
        .toMat(Sink.foreach(p -> p.second().complete(p.first())), Keep.left())
        .run(materializer);

...

public CompletionStage<HttpResponse> request(HttpRequest request) {
    log.debug("Making request {}", request);

    Promise<HttpResponse> promise = Futures.promise();
    return queue.offer(Pair.create(request, promise))
        .thenCompose(buffered -> {
            if (buffered instanceof QueueOfferResult.Enqueued$) {
                return FutureConverters.toJava(promise.future())
                    .thenApply(resp -> {
                        if (log.isDebugEnabled()) {
                            log.debug("Got response {} {}", resp.status(), resp.getHeaders());
                        }
                        return resp;
                    });
            } else {
                log.error("Could not buffer request {}", request);
                return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE));
            }
        });
}
于 2016-01-21T21:36:38.387 回答
0

您需要做的就是在您的服务 A 代码中为服务 B设置一个HostConnectionPool 。这将为您提供一个Flow可以添加到您的服务 A 流,以使用连接池而不是每个流的新连接将请求从 A 分派到 B。从文档中:

与连接级客户端 API 不同,主机级 API 使您无需手动管理单个 HTTP 连接。它自主管理与一个特定目标端点(即主机/端口组合)的可配置连接池。

此流在不同流中的每个具体化都将从这个底层连接池中提取:

获取到给定目标端点的连接池的最佳方法是Http.get(system).cachedHostConnectionPool(...)方法,它返回一个Flow可以“烘焙”到应用程序级流设置中的方法。此流也称为“池客户端流”。

于 2016-01-17T15:31:37.233 回答