1

话题

我想通过 Akka HTTP 与 Akka Actor 交互。这个想法是有一个系统,其中 HTTP 客户端调用 Akka HTTP 服务器方法,该方法处理对 Akka Actor 的请求。参与者处理消息并响应调用者(Akka HTTP),调用者回答 HTTP 客户端。我设法按照上面的描述做了,但我认为我做的不对,因为我的实现似乎是阻塞的。

我解释得更好:如果我发出许多并发 HTTP 请求,我会看到 Akka HTTP“创建一个队列”,因此在发送以下请求之前等待参与者处理请求。

相反,我想获得的是Akka HTTP 服务器将来自 HTTP 客户端的请求立即转发到目标 akka 演员,而无需等待演员结束阐述。 我想使用actor邮箱容量参数来确定消息队列有多大,如果消息太多则拒绝消息。

因此,我需要一种方法让 Akka HTTP 异步等待参与者响应。

我知道邮箱容量工作正常,因为如果我改为使用简单的actor2.tell("Prova1", system.deadLetters())向我的演员发出许多请求(仅用于测试),超过邮箱大小的请求是正确的被拒绝。


参考

为了测试我的系统,我按照 akka 文档提供的最小示例创建了一个简单的配置。这对于 akka http: https ://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example

以及用于创建我的演员的以下内容: https ://doc.akka.io/docs/akka/current/actors.html#creating-actors


我的代码

我做的第一件事是创建一个带有一个actor(actor1)的系统,配置akka HTTP如下:

public class TestActor {

    private static ActorSystem system;

    public static void main(String[] args) throws InterruptedException
    {
        String httpBindAddress = "0.0.0.0";
        int httpPort = 8086;        
        system = ActorSystem.create("deupnp");
        ActorMaterializer materializer = ActorMaterializer.create(system);      
        Http http = Http.get(system);
        AllDirectives app = new AllDirectives() {           
        };

        Route routeActor =  app.get(() ->
        app.pathPrefix("mysuburl", () -> 
        app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor -> 
        app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message -> 
        app.onSuccess(() -> 
        CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response -> 
        app.complete(StatusCodes.get(200), response))))));

        Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.route(routeActor).flow(system, materializer);
        CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);

        // create system with one actor
        ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");    
    }

    private static String actorFunctionCall(String actor, String message)
    {
        try {
            Inbox inbox = Inbox.create(system);
            system.actorSelection("user/"+actor).tell(message, inbox.getRef());
            String response  = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
            return response;
        } catch (Exception e) {
            //return new ResponseMessage(204,"Error");
            e.printStackTrace();
            return null;
        }
    }
}

我的 ActorTest 如下:

public class ActorTest extends AbstractActor {

    private String myName = ""; 

    public ActorTest(String nome){
        this.myName = nome;
    }

    @Override
    public void preStart()
    {
    }

    @Override
    public Receive createReceive() {        
        return receiveBuilder()
            .match(String.class, 
                message -> {
                Thread.sleep(5000l);
                System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
            })
            .matchAny(mex->{
                System.out.println("Error");
            })
            .build();
    }   
}

我的 application.conf 非常简单:

akka
{
    stdout-loglevel = "DEBUG"
    loglevel = "DEBUG"
    actor {
        default-dispatcher {
            throughput = 10
        }
    }
}

my-mailbox {
    mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
    mailbox-capacity = 1
}


预期成绩

如您所见,如果邮箱容量 = 1,我希望,如果我发出超过 1 个并发请求,则只有一个被处理,其余的被丢弃。

我认为上面的代码对于我想要获得的内容不正确,因为我使用 Akka HTTP 路由来接收http://127.0.0.1/mysuburl/actor1/my_msg上的 HTTP 请求,然后使用收件箱发送消息到演员并等待回应。

所以我的问题是:以异步方式将我的 Akka HTTP 请求链接到我的 Akka Actor actor 1 的正确方法是什么?

如果您需要更多详细信息,请告诉我。

笔记

我什至阅读了以下文章: https ://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html

它解释了如何创建有限数量的线程以处理多个阻塞请求,但我认为这只会“减轻”我的代码的影响,它是阻塞的,但必须以不阻塞的方式编写。

4

0 回答 0