我有三个与 Project Reactor 有关的问题,我将在下面问他们。从我拥有的代码开始(它将被简化以更容易理解问题)。
Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
return Mono.just("hello")
.compose(monostr -> monostr
.doOnSuccess(str -> System.out.println("Suppose I want to release session here after all")) //(1)
.doOnCancel(() -> System.out.println("cancelled")) //(2)
.then(callback::apply)
.timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
);
}
并测试:
@Test
public void testDoWithSession2() throws Exception {
Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
System.out.println("do some long timed work");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("work has completed");
return str.length();
});
StepVerifier.create(doWithSession(fun1,1000))
.verifyError(TimeoutException.class);
}
所以和问题:
- 如何中断调用
fun1
并立即返回错误?(也许我做错了什么,但看起来错误不是在超时后而是在调用回调之后返回) - 为什么
doOnSuccess
和同时doOnCancel
调用?(我预计会调用(1)或(2),但不会同时调用两者) - 以及如何处理以下情况:
- 想象在代码
Mono.just("hello")
中正在获取连接; - 在
callback
我正在做一些与连接有关的事情并获得一些结果(Mono<Integer>
在我的情况下); - 最后(成功或失败)我想释放会话(我尝试在(1)中这样做)。
- 想象在代码