1

我正在尝试开发一个 REST 资源,通过事件总线将请求转发给负责的服务。然后该服务尝试使用 JavaRX 从 Elastic Search 异步获取一些数据。

我正在io.reactiverse为 Vert.x 使用 ElasticSearch 客户端实现

我不知道如何将 ElasticSearch 数据返回给客户端

弹性资源

import io.reactivex.Single;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.eventbus.EventBus;
import io.vertx.reactivex.core.eventbus.Message;
import org.elasticsearch.action.get.GetResponse;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/elastic")
@ApplicationScoped
public class ElasticResource {

    @Inject
    EventBus eventBus;

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("bank-es")
    public void greetingVertx(@Suspended final AsyncResponse inAsyncResponse) {

        Single<Message<GetResponse>> single = eventBus.<GetResponse>rxSend("QuarkusElasticService.getReq", new JsonObject().put("index", "bank").put("id", "1"));

        single.subscribe((mex) -> {
            inAsyncResponse.resume(Response.ok(mex.body()).build());
        });
    }
}

QuarkusElasticServiceImpl

import com.sourcesense.sisal.socialbetting.dev.example.elastic.service.QuarkusElasticService;
import io.quarkus.vertx.ConsumeEvent;
import io.reactiverse.elasticsearch.client.reactivex.RestHighLevelClient;
import io.reactivex.Single;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import org.apache.http.HttpHost;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.util.concurrent.ExecutorService;

public class QuarkusElasticServiceImpl implements QuarkusElasticService {

    @Inject
    Vertx vertx;

    @Inject
    ExecutorService executor;

    private RestHighLevelClient esClient;

    @PostConstruct
    public void init() {
        RestClientBuilder builder = RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http"));
        esClient = RestHighLevelClient.create(vertx, builder);

    }


    @Override
    @ConsumeEvent("QuarkusElasticService.getReq")
    public Single getReq(JsonObject jsonObject) {

        GetRequest getRequest = new GetRequest(
                jsonObject.getString("index"),
                jsonObject.getString("id"));

        return esClient.rxGetAsync(getRequest, RequestOptions.DEFAULT);
    }
}
4

1 回答 1

1

在对克莱门特的评论进行推理后,我找到了解决方案。

首先,我在 2io.reactiverse的模块之间切换,选择了非 RxJava 版本io.reactiverse.elasticsearch-client

然后我回到了andio.vertx.axle的版本。EventBusMessage

然后我改变了我的代码如下:

弹性资源

import io.vertx.axle.core.eventbus.EventBus;
import io.vertx.core.json.JsonObject;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.ExecutionException;

@Path("/elastic")
@ApplicationScoped
public class ElasticResource {

    @Inject
    EventBus eventBus;

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("bank-es")
    public JsonObject greetingVertx() throws ExecutionException, InterruptedException {

        JsonObject req = new JsonObject().put("index", "bank").put("id", "1");

        return eventBus.<JsonObject>send("QuarkusElasticService.getReq", req)
                .toCompletableFuture().get().body();
    }
}

QuarkusElasticServiceImpl

import com.sourcesense.sisal.socialbetting.dev.example.elastic.service.QuarkusElasticService;
import io.quarkus.vertx.ConsumeEvent;
import io.reactiverse.elasticsearch.client.RestHighLevelClient;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import org.apache.http.HttpHost;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class QuarkusElasticServiceImpl implements QuarkusElasticService {

    @Inject
    Vertx vertx;

    private RestHighLevelClient esClient;

    @PostConstruct
    public void init() {

        RestClientBuilder builder = RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http"));
        esClient = RestHighLevelClient.create(vertx, builder);

    }


    @Override
    @ConsumeEvent("QuarkusElasticService.getReq")
    public CompletionStage<JsonObject> getReq(JsonObject jsonObject) {

        CompletableFuture future = new CompletableFuture();

        GetRequest getRequest = new GetRequest(
                jsonObject.getString("index"),
                jsonObject.getString("id"));

        esClient.getAsync(getRequest, RequestOptions.DEFAULT, ar -> {
            if (ar.failed()) {
                future.completeExceptionally(new Exception("erroraccio"));
            } else {
                future.complete(JsonObject.mapFrom(ar.result()));
            }
        });

        return future;
    }
}
于 2019-05-31T11:06:37.233 回答