2

如果服务出现任何异常,我想重试。但是当使用 retryWhen 时出现异常 java.lang.IllegalStateException: UnicastProcessor 只允许单个订阅者。

无需重试,它工作正常

Flux.window(10)
    .flatMap(
          windowedFlux -> 
         webclient.post().uri(url)
        .body(BodyInserters.fromPublisher(windowedFlux, Request.class))
        .exchange()
        .doOnNext(ordRlsResponse -> {
                     if( ordRlsResponse.statusCode().is2xxSuccessful()) {                               
                        Mono<ResponseEntity<Response>> response = ordRlsResponse.toEntity(Response.class);              
                        //doing some processing here             
                     }           
                     else {                  
                        throw new CustomeException(errmsg);              
                     }

        }).retryWhen(retryStrategy)).subscribe();

retryStrategy的定义如下:

Retry retryStrategy = Retry.fixedDelay((long)5, Duration.ofSeconds((long)5)) 
                         .filter(exception -> exception instanceof CustomeException) 
                         .doAfterRetry( exception -> log.info("Retry attempted"))
4

0 回答 0