0

我正在运行 vertx 的本地实例。路由器将我的请求重定向到具有以下处理程序的工作节点:

protected Handler<Message<JsonObject>> handler1() {
    return msg -> {
        final RequestBody req = mapper.readValue(msg.body().encode(), RequestBody.class);
        processRequest(req, msg);
    }
}

processRequest 函数接受请求正文,调用两个外部服务,聚合响应,然后返回给客户端。

private processRequest(RequestBody req, Message<JsonObject> msg) {
    CompletableFuture<String> service1Response = getService1ResponseAsync(req); // Make async call to service 1
    String service2Response = getService2ResponseSync(req); // Make sync call to service 2

    ResponseBody response = aggregateResult(service1Response.join(), service2Response);  // Tag1
    msg.reply(mapper.writeValueAsString(response));
}

private CompletableFuture<String> getService1ResponseAsync(RequestBody req) {
    CompletableFuture<String> result = new CompletableFuture();
    // Below handler call makes GET call to service 1 using apache HTTP client and returns the response
    vertx.eventBus().request("serviceVerticleAddr1", mapper.writeValueAsString(req), new DeliveryOptions(), reply -> { // Tag2
        if (reply.succeeded())
            result.complete(reply.result().body().toString());
        else
            result.completeExceptionally(result.cause());
    }
}

当我点击上述 API 时,我的请求超时。分配用于执行我的请求的工作池中的线程在 Tag1 处被永久阻塞。在进一步调试时,我发现 Tag2 中调用的回复处理程序没有被调用。

服务 verticle (serviceVerticleAddr1) [ie Tag2] 中的处理程序为使用它的其他 API 返回正确的响应,但对我来说它被阻塞了。有人可以帮我确定原因吗?当调用 vertx.eventBus().request [Tag2] 的线程开始在 service1Response.join() [Tag1] 处等待未来完成时,是否会形成某种死锁?

4

1 回答 1

1

我认为被阻止是因为消费者没有通知发件人消息已被处理。我建议您检查为serviceVerticleAddr1地址消费者注册的处理程序块内部,并确保正在回复(通知)发件人请求的消息已成功处理(或未成功处理)。消费者可能看起来像

vertx.eventBus().consumer("serviceVerticleAddr1", message -> {
  try{
     doSomething();
     message.reply("done");
  } catch(){
     message.fail(0, "fails");
  }
});

这样,发送者的异步处理程序将被通知消费者可以处理请求的消息

于 2020-12-01T22:46:23.853 回答