-1

我仍然熟悉 vert.x。来自 Spring boot 和 Spring webflux 背景,我想尝试一些我以前在 Spring 生态系统中做的基本东西。

所以我的想法是编写一个通过控制器公开的 api,它将实际工作委托给服务。我能想到在 vert.x 世界中实现这一点的唯一方法是利用事件总线。在这里,我KeyValueServiceVerticlegetKeyValues方法应该从发布者 () 获取键值列表,keyValueRepository.findAllItems().items()并通过事件总线将它们发送回原始事件发布者 api。我确实得到了预期的结果(键值列表),但不知何故我对性能不满意。我在 spring webflux 和 vert.x 的等效代码中添加了一些负载,并且我的 webflux 实现总是表现更好(更高的 RPS)。相关存储库:https ://github.com/tahniat-ashraf/spring-boot-webflux-vert.x-comparison

我是否在某处阻止了代码?有没有更好的 vert.x 方法来实现我想要实现的目标?

相关代码:

公共类 KeyValueController 扩展 AbstractVerticle {

  @Override
  public void start() throws Exception {
    Router router = Router.router(vertx);
    router
      .route()
      .handler(BodyHandler.create());
    router.route()
      .handler(LoggerHandler.create(LoggerFormat.DEFAULT));
    router
      .route(HttpMethod.GET, "/keyValues")
      .handler(this::getKeyValues);

    vertx
      .createHttpServer()
      .requestHandler(router)
      .listen(6678);
  }

  private void getKeyValues(RoutingContext routingContext) {
    vertx
      .eventBus()
      .request(KeyValueServiceVerticle.GET_LIST_ADDRESS, new JsonObject(), messageAsyncResult ->
        routingContext.response()
          .putHeader("content-type", "application/json")
          .end((String) messageAsyncResult.result().body())
      );
  }
}

public class KeyValueServiceVerticle extends AbstractVerticle {

  public static final String GET_LIST_ADDRESS = "GET_LIST_KEY_VAL";
  private KeyValueRepository keyValueRepository;
  private DynamoConfiguration dynamoConfiguration;

  @Override
  public void start() throws Exception {
    dynamoConfiguration = new DynamoConfiguration();
    keyValueRepository = new KeyValueRepository("dev-paybill-key-value", dynamoConfiguration.getDynamoDBEnhancedClient());
    var eventBus = vertx.eventBus();
    eventBus
      .consumer(KeyValueServiceVerticle.GET_LIST_ADDRESS, this::getKeyValues);

  }

  private <T> void getKeyValues(Message<T> tMessage) {

    Observable.fromPublisher(keyValueRepository.findAllItems().items())
      .toList()
      .subscribe(tList -> {
        JsonArray jsonArray=new JsonArray(tList);
        tMessage.reply(jsonArray.encodePrettily());
      });
  }
}
4

1 回答 1

2

您确定位于的发布Observable.fromPublisher(keyValueRepository.findAllItems().items())者是非阻塞发布者吗?

Rxjava2 的默认设置实际上是以阻塞方式处理事件。确保以非阻塞方式处理事件的一种方法是使用.subscribeOn(RxHelper.scheduler(vertx))运算符或配置 RxJava2 以默认使用 vertx 事件循环,而不是使用以下标准 RxJava2 线程:

RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));

为了使这更容易尝试以下操作

.end((String) messageAsyncResult.result().body())
// to
.end(messageAsyncResult.result().body().encode())
于 2021-03-25T07:25:59.500 回答