3

我正在尝试将新的 Spring WebFlux 框架与 kotlin 一起使用。而且我找不到此代码(myService)的错误所在:

fun foo(): Flux<ByteArray> {
    val client = WebClient.create("http://byte-array-service")
    return client
            .get()
            .uri("/info")
            .accept(MediaType.APPLICATION_OCTET_STREAM)
            .exchange()
            .flatMapMany {
                r -> r.bodyToFlux(ByteArray::class.java)
            }
}

此方法返回 7893 字节的 Flux,我知道字节数组服务发送的字节并非全部。如果我使用旧式休息模板一切都可以

fun foo(): Flux<ByteArray> {
    val rt = RestTemplate()
    rt.messageConverters.add(
            ByteArrayHttpMessageConverter())
    val headers = HttpHeaders()
    headers.accept = listOf(MediaType.APPLICATION_OCTET_STREAM)

    val entity = HttpEntity<String>(headers)
    val r = rt.exchange("http://byte-array-service/info", HttpMethod.GET,entity, ByteArray::class.java)
    return Flux.just(r.body)
}

它返回从 byte-array-service 发送的所有 274124 个字节

这是我的消费者

fun doReadFromByteArrayService(req: ServerRequest): Mono<ServerResponse> {

    return Mono.from(myService
            .foo()
            .flatMap {
                accepted().body(fromObject(it.size))
            })
}
4

1 回答 1

1

如果我正确理解了您的问题,并且您只需要向前传递流量,那么这应该可行。我在自己的环境中对其进行了测试,读取所有字节都没有问题。

获取字节:

fun foo(): Flux<ByteArray> =
    WebClient.create("http://byte-array-service")
        .get()
        .uri("/info")
        .accept(MediaType.APPLICATION_OCTET_STREAM)
        .retrieve()
        .bodyToFlux(ByteArray::class.java)

返回带有响应的字节:

fun doReadFromByteArrayService(req: ServerRequest): Mono<ServerResponse> =
        ServerResponse.ok().body(foo())  
于 2018-02-07T14:43:47.437 回答