1

我们最近改变了我们的 api 以利用 openapi web 合约和 eventbus 的组合来为我们的客户发送和接收 api 响应。

它工作正常,直到今天我们注意到我们的大多数 api 都超时了,抛出了以下错误。

API FAILURE: (TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: __vertx.reply.448, repliedAddress: /v1/analytics/post
at io.vertx.core.eventbus.impl.HandlerRegistration.sendAsyncResultFailure(HandlerRegistration.java:146)
at io.vertx.core.eventbus.impl.HandlerRegistration.lambda$new$0(HandlerRegistration.java:78)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:907)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:866)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

作为我们的消费者,我们无法调试甚至没有收到要回复的消息。我们的日志只有这些错误消息。

例如openapi web contract和eventbus setup,我们所有的api都是这样配置的

routerFactory.addHandlerByOperationId("addViews", routingContext -> {
           RequestParameters requestParameters = routingContext.get("parsedParameters");

                vertx.eventBus()
                        .send(Endpoints.API_ANALYTICS_POST,
                                Json.encodeToBuffer(requestParameters.body().getJsonObject()),
                                result -> {
                                    if (result.succeeded()) {
                                        successResponse(routingContext, result.result());
                                    } else {
                                        errorResponse(routingContext, result.cause());
                                    }
                                });

        });
        registerFailureHandler(routerFactory,"addViews");

我们现在使用共享事件总线,而不是集群事件总线。事件总线可以处理的消息数量是否有任何限制?

MessageConsumer<Buffer> views = eventBus.consumer(Endpoints.API_ANALYTICS_POST);


views.handler(message->{
        addPostViews(message, message.body().toJsonObject());
    });



private void addPostViews(Message message, JsonObject request) {
    LOGGER.info("PostViewsEvent: {0}",  request.toString());
    postMetricsService.addPostViews(request).setHandler(res -> {
        JsonObject response = new JsonObject();
        response.put("captured", true);
        if (res.succeeded()) {
            message.reply(ApiResponse.getCustomSuccessResponse(response));
        } else {
            errorResponse(message,res.cause());
        }
    });
}

public Future<Boolean> addPostViews(JsonObject views) {
    Future<Boolean> future = Future.future();
    future.complete(true);
    // mongo db query to store views
    // redis query to store views
}
4

0 回答 0