11

我有三个与 Project Reactor 有关的问题,我将在下面问他们。从我拥有的代码开始(它将被简化以更容易理解问题)。

Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
  return Mono.just("hello")
        .compose(monostr -> monostr
            .doOnSuccess(str -> System.out.println("Suppose I want to release session here after all")) //(1)
            .doOnCancel(() -> System.out.println("cancelled")) //(2)
            .then(callback::apply)
            .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
        );
}

并测试:

@Test
public void testDoWithSession2() throws Exception {
  Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
    System.out.println("do some long timed work");
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("work has completed");
    return str.length();
  });

  StepVerifier.create(doWithSession(fun1,1000))
    .verifyError(TimeoutException.class);
}

所以和问题:

  1. 如何中断调用fun1并立即返回错误?(也许我做错了什么,但看起来错误不是在超时后而是在调用回调之后返回)
  2. 为什么doOnSuccess和同时doOnCancel调用?(我预计会调用(1)或(2),但不会同时调用两者)
  3. 以及如何处理以下情况:
    • 想象在代码Mono.just("hello")中正在获取连接;
    • callback我正在做一些与连接有关的事情并获得一些结果(Mono<Integer>在我的情况下);
    • 最后(成功或失败)我想释放会话(我尝试在(1)中这样做)。
4

2 回答 2

10

1)如您所见,使用.publishOn(Schedulers.single()). 这将确保在另一个线程上调用可调用对象并且仅阻塞所述线程。另外,它将允许取消可调用对象。

2)你的链条顺序很重要。您将.doOnSuccess其放在开头compose(顺便说一下,对于该特定示例,您实际上并不需要它,除非您想提取该 compose 函数以供以后重用)。因此,这意味着它从Mono.just基本获得通知,并在查询源后立即运行,甚至在您的处理发生之前......对于doOnCancel. 取消来自timeout触发...

3)有一个工厂可以从资源中创建序列并确保清理资源:Mono.using. 所以它看起来像这样:

public <T> Mono<T> doWithConnection(Function<String, Mono<T>> callback, long timeout) {
    return Mono.using(
            //the resource supplier:
            () -> {
                System.out.println("connection acquired");
                return "hello";
            },
            //create a Mono out of the resource. On any termination, the resource is cleaned up
            connection -> Mono.just(connection)
                              //the blocking callable needs own thread:
                              .publishOn(Schedulers.single())
                              //execute the callable and get result...
                              .then(callback::apply)
                              //...but cancel if it takes too long
                              .timeoutMillis(timeout)
                              //for demonstration we'll log when timeout triggers:
                              .doOnError(TimeoutException.class, e -> System.out.println("timed out")),
            //the resource cleanup:
            connection -> System.out.println("cleaned up " + connection));
}

这将返回Mono<T>可调用的 T 值。在生产代码中,您将订阅它以处理该值。在测试中,StepVerifier.create()将为您订阅。

让我们用您长时间运行的任务来证明这一点,看看它输出了什么:

@Test
public void testDoWithSession2() throws Exception {
    Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> {
        System.out.println("start some long timed work");
        //for demonstration we'll print some clock ticks
        for (int i = 1; i <= 5; i++) {
            try {
                Thread.sleep(1000);
                System.out.println(i + "s...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("work has completed");
        return str.length();
    });

    //let two ticks show up
    StepVerifier.create(doWithConnection(fun1,2100))
                .verifyError(TimeoutException.class);
}

这输出:

connection acquired
start some long timed work
1s...
2s...
timed out
cleaned up hello

如果我们将超时设置为超过 5000,我们会得到以下结果。(有一个断言错误,因为 StepVerifier 需要超时):

connection acquired
start some long timed work
1s...
2s...
3s...
4s...
5s...
work has completed
cleaned up hello

java.lang.AssertionError: expectation "expectError(Class)" failed (expected: onError(TimeoutException); actual: onNext(5)
于 2017-01-05T10:20:21.903 回答
0

对于第一个问题,答案似乎是使用调度程序:

Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) {
    Scheduler single = Schedulers.single();
    return Mono.just("hello")
            .compose(monostr -> monostr
                    .publishOn(single) // use scheduler
                    .then(callback::apply)
                    .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
            );
}

第三个问题可以这样解决:

private Mono<Integer> doWithSession3(Function<String, Mono<Integer>> callback, long timeout) {
    Scheduler single = Schedulers.single();
    return Mono.just("hello")
            .then(str -> Mono.just(str) // here wrapping our string to new Mono
                    .publishOn(single)
                    .then(callback::apply)
                    .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout)))
                    .doAfterTerminate((res, throwable) -> System.out.println("Do anything with your string" + str))
            );
}
于 2017-01-04T13:02:55.390 回答