34

我正在使用来自 spring webflux 的 webclient,如下所示:

WebClient.create()
            .post()
            .uri(url)
            .syncBody(body)
            .accept(MediaType.APPLICATION_JSON)
            .headers(headers)
            .exchange()
            .flatMap(clientResponse -> clientResponse.bodyToMono(tClass));

它运作良好。我现在想处理来自我正在调用的 Web 服务的错​​误(Ex 500 内部错误)。通常我会在“流”上添加一个 doOnError 并使用 Throwable 来测试状态代码,

但我的问题是我想获取 web 服务提供的正文,因为它为我提供了我想使用的消息。

无论发生什么,我都希望执行 flatMap 并测试自己的状态代码以反序列化或不反序列化主体。

4

12 回答 12

27

我更喜欢使用 ClientResponse 提供的方法来处理 http 错误和抛出异常:

WebClient.create()
         .post()
         .uri( url )
         .body( bodyObject == null ? null : BodyInserters.fromValue( bodyObject ) )
         .accept( MediaType.APPLICATION_JSON )
         .headers( headers )
         .exchange()
         .flatMap( clientResponse -> {
             //Error handling
             if ( clientResponse.statusCode().isError() ) { // or clientResponse.statusCode().value() >= 400
                 return clientResponse.createException().flatMap( Mono::error );
             }
             return clientResponse.bodyToMono( clazz )
         } )
         //You can do your checks: doOnError (..), onErrorReturn (..) ...
         ...

实际上,这与 DefaultWebClient 的 DefaultResponseSpec 中用于处理错误的逻辑相同。DefaultResponseSpec 是 ResponseSpec 的一个实现,如果我们使用retrieve()而不是exchange(),我们将拥有它。

于 2020-02-28T13:50:12.593 回答
26

我们没有onStatus()吗?

    public Mono<Void> cancel(SomeDTO requestDto) {
        return webClient.post().uri(SOME_URL)
                .body(fromObject(requestDto))
                .header("API_KEY", properties.getApiKey())
                .retrieve()
                .onStatus(HttpStatus::isError, response -> {
                    logTraceResponse(log, response);
                    return Mono.error(new IllegalStateException(
                            String.format("Failed! %s", requestDto.getCartId())
                    ));
                })
                .bodyToMono(Void.class)
                .timeout(timeout);
    }

和:

    public static void logTraceResponse(Logger log, ClientResponse response) {
        if (log.isTraceEnabled()) {
            log.trace("Response status: {}", response.statusCode());
            log.trace("Response headers: {}", response.headers().asHttpHeaders());
            response.bodyToMono(String.class)
                    .publishOn(Schedulers.elastic())
                    .subscribe(body -> log.trace("Response body: {}", body));
        }
    }
于 2020-06-03T15:02:53.647 回答
16

你也可以这样做

return webClient.getWebClient()
 .post()
 .uri("/api/Card")
 .body(BodyInserters.fromObject(cardObject))
 .exchange()
 .flatMap(clientResponse -> {
     if (clientResponse.statusCode().is5xxServerError()) {
        clientResponse.body((clientHttpResponse, context) -> {
           return clientHttpResponse.getBody();
        });
     return clientResponse.bodyToMono(String.class);
   }
   else
     return clientResponse.bodyToMono(String.class);
});

阅读这篇文章以获得更多示例链接,当我遇到类似的错误处理问题时,我发现它很有帮助

于 2018-03-26T19:52:20.103 回答
14

我通过这样做得到了错误主体:

webClient
...
.retrieve()    
.onStatus(HttpStatus::isError, response -> response.bodyToMono(String.class) // error body as String or other class
                                                   .flatMap(error -> Mono.error(new RuntimeException(error)))) // throw a functional exception
.bodyToMono(MyResponseType.class)
.block();
于 2020-10-29T14:57:25.033 回答
7

我做这样的事情:

Mono<ClientResponse> responseMono = requestSpec.exchange()
            .doOnNext(response -> {
                HttpStatus httpStatus = response.statusCode();
                if (httpStatus.is4xxClientError() || httpStatus.is5xxServerError()) {
                    throw new WebClientException(
                            "ClientResponse has erroneous status code: " + httpStatus.value() +
                                    " " + httpStatus.getReasonPhrase());
                }
            });

接着:

responseMono.subscribe(v -> { }, ex -> processError(ex));
于 2017-06-16T15:44:05.617 回答
7

请注意,在撰写本文时,5xx 错误不再导致来自底层 Netty 层的异常。见https://github.com/spring-projects/spring-framework/commit/b0ab84657b712aac59951420f4e9d696c3d84ba2

于 2017-07-06T11:43:19.667 回答
4

我刚刚遇到了类似的情况,我发现 webClient 不会抛出任何异常,即使它得到 4xx/5xx 响应。就我而言,我首先使用 webclient 进行调用以获取响应,如果它返回 2xx 响应,那么我从响应中提取数据并将其用于进行第二次调用。如果第一次调用得到非 2xx 响应,则抛出异常。因为它没有抛出异常,所以当第一次调用失败并且第二次调用仍然进行时。所以我所做的是

return webClient.post().uri("URI")
    .header(HttpHeaders.CONTENT_TYPE, "XXXX")
    .header(HttpHeaders.ACCEPT, "XXXX")
    .header(HttpHeaders.AUTHORIZATION, "XXXX")
    .body(BodyInserters.fromObject(BODY))
    .exchange()
    .doOnSuccess(response -> {
        HttpStatus statusCode = response.statusCode();
        if (statusCode.is4xxClientError()) {
            throw new Exception(statusCode.toString());
        }
        if (statusCode.is5xxServerError()) {
            throw new Exception(statusCode.toString());
        }
    )
    .flatMap(response -> response.bodyToMono(ANY.class))
    .map(response -> response.getSomething())
    .flatMap(something -> callsSecondEndpoint(something));
}
于 2019-08-23T14:47:37.980 回答
3

我们终于明白发生了什么:默认情况下,Netty 的 httpclient (HttpClientRequest) 配置为在服务器错误(响应 5XX)而不是客户端错误(4XX)时失败,这就是它总是发出异常的原因。

我们所做的是扩展 AbstractClientHttpRequest 和 ClientHttpConnector 来配置 httpclient 的行为方式,当我们调用 WebClient 时,我们使用我们的自定义 ClientHttpConnector :

 WebClient.builder().clientConnector(new CommonsReactorClientHttpConnector()).build();
于 2017-06-23T07:25:36.560 回答
2

每当收到状态码为 4xx 或 5xx 的响应时,WebClient 中的 retrieve() 方法就会引发 WebClientResponseException。

您可以通过检查响应状态代码来处理异常。

   Mono<Object> result = webClient.get().uri(URL).exchange().log().flatMap(entity -> {
        HttpStatus statusCode = entity.statusCode();
        if (statusCode.is4xxClientError() || statusCode.is5xxServerError())
        {
            return Mono.error(new Exception(statusCode.toString()));
        }
        return Mono.just(entity);
    }).flatMap(clientResponse -> clientResponse.bodyToMono(JSONObject.class))

参考: https ://www.callicoder.com/spring-5-reactive-webclient-webtestclient-examples/

于 2021-05-05T12:37:24.103 回答
1

使用我学到的关于“使用 Reactor 抛出异常的正确方法”这个奇妙的 SO 答案,我能够将这个答案放在一起。它使用.onStatus.bodyToMono.handle将错误响应正文映射到异常。

// create a chicken
webClient
    .post()
    .uri(urlService.getUrl(customer) + "/chickens")
    .contentType(MediaType.APPLICATION_JSON)
    .body(Mono.just(chickenCreateDto), ChickenCreateDto.class) // outbound request body
    .retrieve()
    .onStatus(HttpStatus::isError, clientResponse ->
        clientResponse.bodyToMono(ChickenCreateErrorDto.class)
            .handle((error, sink) -> 
                sink.error(new ChickenException(error))
            )
    )
    .bodyToMono(ChickenResponse.class)
    .subscribe(
            this::recordSuccessfulCreationOfChicken, // accepts ChickenResponse
            this::recordUnsuccessfulCreationOfChicken // accepts throwable (ChickenException)
    );
于 2021-10-04T22:20:54.273 回答
0

我偶然发现了这个,所以我想我不妨发布我的代码。

我所做的是创建一个全局处理程序,负责处理来自 Web 客户端的请求和响应错误。这是在 Kotlin 中,但当然可以很容易地转换为 Java。这扩展了默认行为,因此您可以确保在客户处理之上获得所有自动配置。

正如您所看到的,这并没有真正做任何自定义,它只是将 Web 客户端错误转换为相关响应。对于响应错误,代码和响应正文只是简单地传递给客户端。对于当前的请求错误,它只处理连接问题,因为这就是我所关心的(目前),但正如您所见,它可以轻松扩展。

@Configuration
class WebExceptionConfig(private val serverProperties: ServerProperties) {

    @Bean
    @Order(-2)
    fun errorWebExceptionHandler(
        errorAttributes: ErrorAttributes,
        resourceProperties: ResourceProperties,
        webProperties: WebProperties,
        viewResolvers: ObjectProvider<ViewResolver>,
        serverCodecConfigurer: ServerCodecConfigurer,
        applicationContext: ApplicationContext
    ): ErrorWebExceptionHandler? {
        val exceptionHandler = CustomErrorWebExceptionHandler(
            errorAttributes,
            (if (resourceProperties.hasBeenCustomized()) resourceProperties else webProperties.resources) as WebProperties.Resources,
            serverProperties.error,
            applicationContext
        )
        exceptionHandler.setViewResolvers(viewResolvers.orderedStream().collect(Collectors.toList()))
        exceptionHandler.setMessageWriters(serverCodecConfigurer.writers)
        exceptionHandler.setMessageReaders(serverCodecConfigurer.readers)
        return exceptionHandler
    }
}

class CustomErrorWebExceptionHandler(
    errorAttributes: ErrorAttributes,
    resources: WebProperties.Resources,
    errorProperties: ErrorProperties,
    applicationContext: ApplicationContext
)  : DefaultErrorWebExceptionHandler(errorAttributes, resources, errorProperties, applicationContext) {

    override fun handle(exchange: ServerWebExchange, throwable: Throwable): Mono<Void> =
        when (throwable) {
            is WebClientRequestException -> handleWebClientRequestException(exchange, throwable)
            is WebClientResponseException -> handleWebClientResponseException(exchange, throwable)
            else -> super.handle(exchange, throwable)
        }

    private fun handleWebClientResponseException(exchange: ServerWebExchange, throwable: WebClientResponseException): Mono<Void> {
        exchange.response.headers.add("Content-Type", "application/json")
        exchange.response.statusCode = throwable.statusCode

        val responseBodyBuffer = exchange
            .response
            .bufferFactory()
            .wrap(throwable.responseBodyAsByteArray)

        return exchange.response.writeWith(Mono.just(responseBodyBuffer))
    }

    private fun handleWebClientRequestException(exchange: ServerWebExchange, throwable: WebClientRequestException): Mono<Void> {
        if (throwable.rootCause is ConnectException) {

            exchange.response.headers.add("Content-Type", "application/json")
            exchange.response.statusCode = HttpStatus.BAD_GATEWAY

            val responseBodyBuffer = exchange
                .response
                .bufferFactory()
                .wrap(ObjectMapper().writeValueAsBytes(customErrorWebException(exchange, HttpStatus.BAD_GATEWAY, throwable.message)))

            return exchange.response.writeWith(Mono.just(responseBodyBuffer))

        } else {
            return super.handle(exchange, throwable)
        }
    }

    private fun customErrorWebException(exchange: ServerWebExchange, status: HttpStatus, message: Any?) =
        CustomErrorWebException(
            Instant.now().toString(),
            exchange.request.path.value(),
            status.value(),
            status.reasonPhrase,
            message,
            exchange.request.id
        )
}

data class CustomErrorWebException(
    val timestamp: String,
    val path: String,
    val status: Int,
    val error: String,
    val message: Any?,
    val requestId: String,
)
于 2021-06-28T15:56:46.520 回答
0

实际上,您可以在onError通话中轻松记录正文:

            .doOnError {
                logger.warn { body(it) }
            }

和:

    private fun body(it: Throwable) =
        if (it is WebClientResponseException) {
            ", body: ${it.responseBodyAsString}"
        } else {
            ""
        }
于 2022-03-05T16:35:43.973 回答