12

我有以下代码重新调整 Mono<Foo>:

try {
    return userRepository.findById(id)  // step 1
        .flatMap(user -> barRepository.findByUserId( user.getId())  // step 2
        .map(bar-> Foo.builder().msg("Already exists").build())  // step 3
            .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())  // step 4
                .map(bar-> Foo.builder().msg("Created").build())   // step 5 
            ))
            .doOnError(throwable -> Mono.just(handleError(throwable)));
    } catch(Exception e) {

        log.error("from catch block");
        return Mono.just(handleError(e));

    }

如果在第 1 步发生错误(例如,指定 id 的用户不存在),它会被 doOnError 或 try catch 块捕获,还是这两者都没有?

如果在第 2 步、第 3 步、第 4 步中发生错误,则同样的问题。

什么是正确的代码,这样错误总是被 doOnError 捕获并消除 try catch?

public interface UserRepository extends ReactiveMongoRepository<User, String>对 barRepository 使用相同的。

handleError(throwable) 只是简单地执行 log.error(e.getMessage() 并重新调整 Foo。

4

6 回答 6

6

我认为第一个错误在标题中:“Mono or Flux”与错误处理无关。

  • Mono最多只能发出一个项目(流一个元素)
  • Flux可以发出更复杂的东西(即 List)

要处理错误,您可以按照以下示例进行操作:

return webClient.get()
                .uri(url)
                .retrieve()
                .bodyToMono(ModelYouAreRetrieving.class)
                .doOnError(throwable -> logger.error("Failed for some reason", throwable))
                .onErrorReturn(new ModelYouAreRetrieving(...))
                .block();
于 2020-11-12T14:18:50.473 回答
5

DoOnError 只会执行副作用,并假设 findById 将返回 Mono.Error() 如果它失败,这样的事情应该可以工作。

return userRepository.findById(id)
    .flatMap ( user -> 
        barRepository.findByUserId(user.getId())
        .map((user,bar)-> Foo.builder().msg("Already exists").build())  
        .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())
        .map(bar-> Foo.builder().msg("Created").build())

    ))
    .onErrorReturn(throwable -> Mono.just(handleError(throwable)));

仅当您调用链的阻塞操作或在进入反应链之前发生运行时错误时,try catch 才会起作用。这些doOn操作不会修改链,它们仅用于副作用。由于flatMap需要一个生产者,因此您需要从调用中返回一个 Mono,在这种情况下,如果发生错误,那么它只会传播错误。在所有反应链中,除非另有处理,否则错误将传播。

于 2018-06-25T15:17:38.423 回答
4

使用Exceptions.propagate(e)它将检查的异常包装成一个特殊的运行时异常,可以由onError

下面的代码试图覆盖大写的用户属性。现在,当它遇到 kyle 时,会抛出检查的异常,并从返回 MIKEonErrorReturn

@Test
void Test19() {
    Flux.fromIterable(Arrays.asList(new User("jhon", "10000"),
            new User("kyle", "bot")))
        .map(x -> {
            try {
                return toUpper(x);
            } catch (TestException e) {
                throw Exceptions.propagate(e);
            }
        })
        .onErrorReturn(new User("MIKE", "BOT")).subscribe(x -> System.out.println(x));
}

protected final class TestException extends Exception {
    private static final long serialVersionUID = -831485594512095557L;
}

private User toUpper(User user) throws TestException{
    if (user.getName().equals("kyle")) {
        throw new TestException();
    }
    return new User(user.getName().toUpperCase(), user.getProfession().toUpperCase());
}

输出

User [name=JHON, profession=10000]
User [name=MIKE, profession=BOT]
于 2020-05-03T19:15:13.040 回答
2

@Gianluca Pinto 的最后一行代码也不正确。代码不会被编译。onErrorReturn 不适合复杂的错误处理。你应该使用的是 onErrorResume。

见:https ://grokonez.com/reactive-programming/reactor/reactor-handle-error#21_By_falling_back_to_another_Flux

onErrorResume 将回退到另一个 Flux 并让您捕获和管理先前 Flux 抛出的异常。如果查看onErrorReturn的实现,你会发现onErrorReturn实际上是在使用onErrorResume。

所以这里的代码应该是:

.onErrorResume(throwable -> Mono.just(handleError(throwable)));
于 2018-11-28T16:00:08.507 回答
0

@James Ralston 代码的最后一行是错误的。正确的代码应该是:

return userRepository.findById(id)
.flatMap ( user -> 
    barRepository.findByUserId(user.getId())
    .map((user,bar)-> Foo.builder().msg("Already exists").build())  
    .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())
    .map(bar-> Foo.builder().msg("Created").build())

))
.onErrorReturn(Mono.just(handleError(throwable)));
于 2018-11-08T15:23:03.120 回答
0

在创建反应流时,我们需要使用 onError*,因为它提供了一个备用 Mono/Flux,而 doOn* 是副作用运算符。

注意:示例在 Kotlin 中

下面是一个例子:

fun saveItems(item: Item) = testRepository.save(item)
        .onErrorResume {
            Mono.error(
                onErrorResumeHandler(
                    it,
                    "APP-1002",
                    "Error occurred while saving the something :P, contact admin"
                )
            )
        }

fun onErrorResumeHandler(exception: Throwable, errorCode: String, errorMessage: String) =
    if (exception is TestRepositoryException) exception else
        TestServiceException(errorCode, errorMessage)

应该有一个中央异常处理程序,我们可以通过扩展来创建AbstractErrorWebExceptionHandler. 顺序为 -2 以取代默认值。

下面是一个例子:

@Component
@Order(-2)
class BaseControllerAdvice(
    errorAttributes: ErrorAttributes,
    resources: WebProperties.Resources,
    applicationContext: ApplicationContext,
    serverCodecConfigurer: ServerCodecConfigurer
) : AbstractErrorWebExceptionHandler(errorAttributes, resources, applicationContext) {

    val log = logger()

    init {
        setMessageWriters(serverCodecConfigurer.writers)
    }

    override fun getRoutingFunction(errorAttributes: ErrorAttributes?) =
        router {
            RequestPredicates.all().invoke(this@BaseControllerAdvice::renderErrorResponse)
        }
    //RouterFunctions.route(RequestPredicates.all(),this::renderErrorResponse)

    fun renderErrorResponse(
        request: ServerRequest
    ): Mono<ServerResponse> {
        val errorPropertiesMap = getErrorAttributes(
            request,
            ErrorAttributeOptions.defaults()
        )
        val ex: ApplicationException = getError(request) as ApplicationException
        log.info("Error attributes:{}", request)
        return ServerResponse.status(HttpStatus.BAD_REQUEST)
            .contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromValue(ErrorResponseVO(ex.errorCode, ex.errorMessage)))
    }

    data class ErrorResponseVO(val errorMessage: String, val errorCode: String)

}
于 2021-06-26T20:40:56.640 回答