我有一个 vertx 应用程序,我在其中部署了 verticle A (HttpVerticle.java) 的多个实例和 verticle B (AerospikeVerticle.java) 的多个实例。Aerospike verticles 需要共享一个 AerospikeClient。HttpVerticle 监听端口 8888 并使用事件总线调用 AerospikeVerticle。我的问题是:
- 使用 sharedData 是共享单例客户端实例的正确方法吗?还有其他推荐/更清洁的方法吗?我计划在应用程序中创建和共享更多这样的单例对象(cosmos db 客户端、meterRegistry 等)。我计划使用 sharedData.localMap 以类似的方式共享它们。
- 是否可以使用 vertx 的事件循环作为 aerospike 客户端的支持事件循环?这样 aerospike 客户端初始化不需要创建自己的新事件循环?目前看起来 aerospike get 调用的 onRecord 部分在 aerospike 的事件循环上运行。
public class SharedAerospikeClient implements Shareable {
public final EventLoops aerospikeEventLoops;
public final AerospikeClient client;
public SharedAerospikeClient() {
EventPolicy eventPolicy = new EventPolicy();
aerospikeEventLoops = new NioEventLoops(eventPolicy, 2 * Runtime.getRuntime().availableProcessors());
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.eventLoops = aerospikeEventLoops;
client = new AerospikeClient(clientPolicy, "localhost", 3000);
}
}
主.java
public class Main {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
LocalMap localMap = vertx.sharedData().getLocalMap("SHARED_OBJECTS");
localMap.put("AEROSPIKE_CLIENT", new SharedAerospikeClient());
vertx.deployVerticle("com.demo.HttpVerticle", new DeploymentOptions().setInstances(2 * 4));
vertx.deployVerticle("com.demo.AerospikeVerticle", new DeploymentOptions().setInstances(2 * 4));
}
}
HttpVerticle.java
public class HttpVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.createHttpServer().requestHandler(req -> {
vertx.eventBus().request("read.aerospike", req.getParam("id"), ar -> {
req.response()
.putHeader("content-type", "text/plain")
.end(ar.result().body().toString());
System.out.println(Thread.currentThread().getName());
});
}).listen(8888, http -> {
if (http.succeeded()) {
startPromise.complete();
System.out.println("HTTP server started on port 8888");
} else {
startPromise.fail(http.cause());
}
});
}
}
AerospikeVerticle.java
public class AerospikeVerticle extends AbstractVerticle {
private SharedAerospikeClient sharedAerospikeClient;
@Override
public void start(Promise<Void> startPromise) throws Exception {
EventBus eventBus = vertx.eventBus();
sharedAerospikeClient = (SharedAerospikeClient) vertx.sharedData().getLocalMap("SHARED_OBJECTS").get("AEROSPIKE_CLIENT");
MessageConsumer<String> consumer = eventBus.consumer("read.aerospike");
consumer.handler(this::getRecord);
System.out.println("Started aerospike verticle");
startPromise.complete();
}
public void getRecord(Message<String> message) {
sharedAerospikeClient.client.get(
sharedAerospikeClient.aerospikeEventLoops.next(),
new RecordListener() {
@Override
public void onSuccess(Key key, Record record) {
if (record != null) {
String result = record.getString("value");
message.reply(result);
} else {
message.reply("not-found");
}
}
@Override
public void onFailure(AerospikeException exception) {
message.reply("error");
}
},
sharedAerospikeClient.client.queryPolicyDefault,
new Key("myNamespace", "mySet", message.body())
);
}
}