2

更新!

在解决了一些与主要问题无关的问题后,我修复了示例代码中的小错误,主要问题仍然是关于服务之间的非阻塞流。

背景资料

我在 Quarkus 下移植了一个 Spring WebFlux 服务。该服务对多个大型数据集运行长时间搜索,并在可用时以 Flux(文本/事件流)的形式返回部分结果。

问题

现在我正在尝试在 Quarkus 下将 Mutiny Multi 与 Vert.x 一起使用,但无法弄清楚消费者服务如何在不阻塞的情况下接收此流。

在所有示例中,消费者要么是 JS 前端页面,要么生产者的内容类型是 application/json,这似乎在 Multi 完成之前一直在发送,然后再将其发送到一个 JSON 对象中(这在我的应用程序中没有意义)。

问题

  1. 如何使用 Mutiny 风格的 Vert.x WebClient 接收文本/事件流?
  2. 如果问题是 WebClient 无法接收连续流:在两个 Quarkus 服务之间流式传输数据的标准方式是什么?

这是一个简化的例子

测试实体

public class SearchResult implements Serializable {

    private String content;

    public SearchResult(String content) {
        this.content = content;
    }


    //.. toString, getters and setters
    
}

生产者 1. 简单的无限流 -> 挂起

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(2)              .onItem().transform(n -> new SearchResult(n.toString()));
}

Producer 2. 使用 Vertx Paths 无限流 -> 挂起

@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
        log.info("routed run");
        return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
                .onItem().transform(n -> new SearchResult(n.toString()));
}

Producer 3. 简单的有限流 -> 阻塞直到完成

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
        .transform().byTakingFirstItems(5)
        .onItem().transform(n -> new SearchResult(n.toString()));
}

消费者

在生产者和消费者方面尝试了多种不同的解决方案,但在每种情况下,流都会阻塞,直到它完成或无限期挂起,而不为无限流传输数据。我用 httpie 得到了相同的结果。这是最新的迭代:

WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
        
client.get("/string")
                .send()
                .onFailure().invoke(resp -> log.error("error: " + resp))
                .onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
                .toMulti()
                .subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));
4

1 回答 1

4

Vert.x Web 客户端不适用于 SSE(无配置)。来自https://vertx.io/docs/vertx-web-client/java/

响应完全缓冲,使用 BodyCodec.pipe 将响应通过管道传输到写入流

它一直等到响应完成。您可以使用原始Vert.x HTTP 客户端或使用pipe编解码器。示例在https://vertx.io/docs/vertx-web-client/java/#_decoding_responses上给出。

或者,您可以使用 SSE 客户端,例如: https ://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java# L27-L34

于 2020-08-31T06:02:54.170 回答