话题
我想通过 Akka HTTP 与 Akka Actor 交互。这个想法是有一个系统,其中 HTTP 客户端调用 Akka HTTP 服务器方法,该方法处理对 Akka Actor 的请求。参与者处理消息并响应调用者(Akka HTTP),调用者回答 HTTP 客户端。我设法按照上面的描述做了,但我认为我做的不对,因为我的实现似乎是阻塞的。
我解释得更好:如果我发出许多并发 HTTP 请求,我会看到 Akka HTTP“创建一个队列”,因此在发送以下请求之前等待参与者处理请求。
相反,我想获得的是Akka HTTP 服务器将来自 HTTP 客户端的请求立即转发到目标 akka 演员,而无需等待演员结束阐述。 我想使用actor邮箱容量参数来确定消息队列有多大,如果消息太多则拒绝消息。
因此,我需要一种方法让 Akka HTTP 异步等待参与者响应。
我知道邮箱容量工作正常,因为如果我改为使用简单的actor2.tell("Prova1", system.deadLetters())向我的演员发出许多请求(仅用于测试),超过邮箱大小的请求是正确的被拒绝。
参考
为了测试我的系统,我按照 akka 文档提供的最小示例创建了一个简单的配置。这对于 akka http: https ://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example
以及用于创建我的演员的以下内容: https ://doc.akka.io/docs/akka/current/actors.html#creating-actors
我的代码
我做的第一件事是创建一个带有一个actor(actor1)的系统,配置akka HTTP如下:
public class TestActor {
private static ActorSystem system;
public static void main(String[] args) throws InterruptedException
{
String httpBindAddress = "0.0.0.0";
int httpPort = 8086;
system = ActorSystem.create("deupnp");
ActorMaterializer materializer = ActorMaterializer.create(system);
Http http = Http.get(system);
AllDirectives app = new AllDirectives() {
};
Route routeActor = app.get(() ->
app.pathPrefix("mysuburl", () ->
app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor ->
app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message ->
app.onSuccess(() ->
CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response ->
app.complete(StatusCodes.get(200), response))))));
Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.route(routeActor).flow(system, materializer);
CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);
// create system with one actor
ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");
}
private static String actorFunctionCall(String actor, String message)
{
try {
Inbox inbox = Inbox.create(system);
system.actorSelection("user/"+actor).tell(message, inbox.getRef());
String response = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
return response;
} catch (Exception e) {
//return new ResponseMessage(204,"Error");
e.printStackTrace();
return null;
}
}
}
我的 ActorTest 如下:
public class ActorTest extends AbstractActor {
private String myName = "";
public ActorTest(String nome){
this.myName = nome;
}
@Override
public void preStart()
{
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class,
message -> {
Thread.sleep(5000l);
System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
})
.matchAny(mex->{
System.out.println("Error");
})
.build();
}
}
我的 application.conf 非常简单:
akka
{
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
actor {
default-dispatcher {
throughput = 10
}
}
}
my-mailbox {
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 1
}
预期成绩
如您所见,如果邮箱容量 = 1,我希望,如果我发出超过 1 个并发请求,则只有一个被处理,其余的被丢弃。
我认为上面的代码对于我想要获得的内容不正确,因为我使用 Akka HTTP 路由来接收http://127.0.0.1/mysuburl/actor1/my_msg上的 HTTP 请求,然后使用收件箱发送消息到演员并等待回应。
所以我的问题是:以异步方式将我的 Akka HTTP 请求链接到我的 Akka Actor actor 1 的正确方法是什么?
如果您需要更多详细信息,请告诉我。
笔记
我什至阅读了以下文章: https ://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html
它解释了如何创建有限数量的线程以处理多个阻塞请求,但我认为这只会“减轻”我的代码的影响,它是阻塞的,但必须以不阻塞的方式编写。