我们最近改变了我们的 api 以利用 openapi web 合约和 eventbus 的组合来为我们的客户发送和接收 api 响应。
它工作正常,直到今天我们注意到我们的大多数 api 都超时了,抛出了以下错误。
API FAILURE: (TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: __vertx.reply.448, repliedAddress: /v1/analytics/post
at io.vertx.core.eventbus.impl.HandlerRegistration.sendAsyncResultFailure(HandlerRegistration.java:146)
at io.vertx.core.eventbus.impl.HandlerRegistration.lambda$new$0(HandlerRegistration.java:78)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:907)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:866)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
作为我们的消费者,我们无法调试甚至没有收到要回复的消息。我们的日志只有这些错误消息。
例如openapi web contract和eventbus setup,我们所有的api都是这样配置的
routerFactory.addHandlerByOperationId("addViews", routingContext -> {
RequestParameters requestParameters = routingContext.get("parsedParameters");
vertx.eventBus()
.send(Endpoints.API_ANALYTICS_POST,
Json.encodeToBuffer(requestParameters.body().getJsonObject()),
result -> {
if (result.succeeded()) {
successResponse(routingContext, result.result());
} else {
errorResponse(routingContext, result.cause());
}
});
});
registerFailureHandler(routerFactory,"addViews");
我们现在使用共享事件总线,而不是集群事件总线。事件总线可以处理的消息数量是否有任何限制?
MessageConsumer<Buffer> views = eventBus.consumer(Endpoints.API_ANALYTICS_POST);
views.handler(message->{
addPostViews(message, message.body().toJsonObject());
});
private void addPostViews(Message message, JsonObject request) {
LOGGER.info("PostViewsEvent: {0}", request.toString());
postMetricsService.addPostViews(request).setHandler(res -> {
JsonObject response = new JsonObject();
response.put("captured", true);
if (res.succeeded()) {
message.reply(ApiResponse.getCustomSuccessResponse(response));
} else {
errorResponse(message,res.cause());
}
});
}
public Future<Boolean> addPostViews(JsonObject views) {
Future<Boolean> future = Future.future();
future.complete(true);
// mongo db query to store views
// redis query to store views
}