1

我有一个 vertx 应用程序,我在其中部署了 verticle A (HttpVerticle.java) 的多个实例和 verticle B (AerospikeVerticle.java) 的多个实例。Aerospike verticles 需要共享一个 AerospikeClient。HttpVerticle 监听端口 8888 并使用事件总线调用 AerospikeVerticle。我的问题是:

  1. 使用 sharedData 是共享单例客户端实例的正确方法吗?还有其他推荐/更清洁的方法吗?我计划在应用程序中创建和共享更多这样的单例对象(cosmos db 客户端、meterRegistry 等)。我计划使用 sharedData.localMap 以类似的方式共享它们。
  2. 是否可以使用 vertx 的事件循环作为 aerospike 客户端的支持事件循环?这样 aerospike 客户端初始化不需要创建自己的新事件循环?目前看起来 aerospike get 调用的 onRecord 部分在 aerospike 的事件循环上运行。
public class SharedAerospikeClient implements Shareable {
  public final EventLoops aerospikeEventLoops;
  public final AerospikeClient client;

  public SharedAerospikeClient() {
    EventPolicy eventPolicy = new EventPolicy();
    aerospikeEventLoops = new NioEventLoops(eventPolicy,  2 * Runtime.getRuntime().availableProcessors());
    ClientPolicy clientPolicy = new ClientPolicy();
    clientPolicy.eventLoops = aerospikeEventLoops;
    client = new AerospikeClient(clientPolicy, "localhost", 3000);
  }
}

主.java

public class Main {
  public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    LocalMap localMap = vertx.sharedData().getLocalMap("SHARED_OBJECTS");
    localMap.put("AEROSPIKE_CLIENT", new SharedAerospikeClient());
    vertx.deployVerticle("com.demo.HttpVerticle", new DeploymentOptions().setInstances(2 * 4));
    vertx.deployVerticle("com.demo.AerospikeVerticle", new DeploymentOptions().setInstances(2 * 4));
  }
}

HttpVerticle.java

public class HttpVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    vertx.createHttpServer().requestHandler(req -> {
      vertx.eventBus().request("read.aerospike", req.getParam("id"), ar -> {
        req.response()
          .putHeader("content-type", "text/plain")
          .end(ar.result().body().toString());
        System.out.println(Thread.currentThread().getName());
      });
    }).listen(8888, http -> {
      if (http.succeeded()) {
        startPromise.complete();
        System.out.println("HTTP server started on port 8888");
      } else {
        startPromise.fail(http.cause());
      }
    });
  }
}

AerospikeVerticle.java

public class AerospikeVerticle extends AbstractVerticle {
  private SharedAerospikeClient sharedAerospikeClient;

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    EventBus eventBus = vertx.eventBus();
    sharedAerospikeClient = (SharedAerospikeClient) vertx.sharedData().getLocalMap("SHARED_OBJECTS").get("AEROSPIKE_CLIENT");
    MessageConsumer<String> consumer = eventBus.consumer("read.aerospike");
    consumer.handler(this::getRecord);
    System.out.println("Started aerospike verticle");
    startPromise.complete();
  }

  public void getRecord(Message<String> message) {
    sharedAerospikeClient.client.get(
      sharedAerospikeClient.aerospikeEventLoops.next(),
      new RecordListener() {
        @Override
        public void onSuccess(Key key, Record record) {
            if (record != null) {
              String result = record.getString("value");
              message.reply(result);
            } else {
              message.reply("not-found");
            }
        }

        @Override
        public void onFailure(AerospikeException exception) {
            message.reply("error");
        }
      },
      sharedAerospikeClient.client.queryPolicyDefault,
      new Key("myNamespace", "mySet", message.body())
    );
  }
}
4

1 回答 1

4

我不知道 Aerospike 客户端。

关于在verticles之间共享对象,共享数据映射确实是为此目的而设计的。

但是,更容易:

  1. 在您的主类或自定义启动器中创建共享客户端
  2. 提供客户端作为 Verticle 构造函数的参数

Vertx接口有一个deployVerticle(Supplier<Verticle>, DeploymentOptions)在这种情况下很方便的方法:

MySharedClient client = initSharedClient();
vertx.deploy(() -> new SomeVerticle(client), deploymentOptions);
于 2021-11-09T09:27:13.757 回答