1

在将 Ratpack与多个下游 promise 和 结合使用时,我遇到NullPointerException了 Ratpack 的问题,从文档中我不清楚我的用法是否不正确,或者这是否代表 Ratpack 中的错误。Promise.cacheParallelBatch

这是一个演示问题的简化测试用例:

@Test
public void foo() throws Exception {
    List<Promise<Integer>> promises = new ArrayList<>();

    for (int i = 0; i < 25; i++) {
        Promise<Integer> p = Promise.value(12);
        p = p.cache();
        promises.add(p.map(v -> v + 1));
        promises.add(p.map(v -> v + 2));
    }

    final List<Integer> results = ExecHarness.yieldSingle(c ->
            ParallelBatch.of(promises).yield()
    ).getValueOrThrow();
}

在本地运行此测试 10000 次会导致失败率约为 10 / 10000,NullPointerException如下所示:

java.lang.NullPointerException
    at ratpack.exec.internal.CachingUpstream.yield(CachingUpstream.java:93)
    at ratpack.exec.internal.CachingUpstream.tryDrain(CachingUpstream.java:65)
    at ratpack.exec.internal.CachingUpstream.lambda$connect$0(CachingUpstream.java:116)
    at ratpack.exec.internal.CachingUpstream$$Lambda$58/1438461739.connect(Unknown Source)
    at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
    at ratpack.exec.internal.DefaultExecution$$Lambda$33/2092087501.execute(Unknown Source)
    at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
    at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
    at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
    at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
    at ratpack.exec.internal.DefaultExecController$1$$Lambda$7/1411892748.call(Unknown Source)
    at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory$$Lambda$8/1157058691.run(Unknown Source)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:745)

不在cache此测试用例中使用会使问题消失,因为不会两次订阅每个缓存的 Promise。

我的问题是:这是对 Ratpack API 的错误使用,还是代表框架中的错误?如果是前者,你能指出我在文档中解释为什么这种用法是错误的吗?

4

1 回答 1

2

即使您的示例不是缓存 Promise 的最佳用例(重新创建和缓存为每个迭代步骤保存相同值的 Promise 没有多大意义),您实际上已经在CachingUpstream<T>类中发现了竞争条件错误。

我做了一些实验来弄清楚发生了什么,这是我的发现。首先,我创建了一个价值承诺,12它提供了对象的自定义(更详细)实现CachingUpstream<T>。我采用了的主体,Promise.value(12)并覆盖了内联一个cacheResultIf(Predicate<? super ExecResult<T>> shouldCache)默认返回CachingUpstream<T>实例的方法:

Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
        continuation.resume(() -> down.success(12))
)) {
    @Override
    public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
        return transform(up -> {
            return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
        });
    }
};

接下来我TestCachingUpstream<T>通过复制原始类的主体创建了一个类,并添加了一些东西,例如

  • 我让每个TestCachingUpstream<T>都有内部 ID(随机 UUID),以便更轻松地跟踪承诺的执行。
  • 当 Promise 执行期间发生特定事情时,我添加了一些详细的日志消息。

我没有更改方法的实现,我只是想跟踪执行流程并保持原始实现不变。我的自定义类如下所示:

private static class TestCachingUpstream<T> implements Upstream<T> {
    private final String id = UUID.randomUUID().toString();

    private Upstream<? extends T> upstream;

    private final Clock clock;
    private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
    private final Function<? super ExecResult<T>, Duration> ttlFunc;

    private final AtomicBoolean pending = new AtomicBoolean();
    private final AtomicBoolean draining = new AtomicBoolean();
    private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

    public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
        this(upstream, ttl, Clock.systemUTC());
    }

    @VisibleForTesting
    TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
        this.upstream = upstream;
        this.ttlFunc = ttl;
        this.clock = clock;
    }

    private void tryDrain() {
        if (draining.compareAndSet(false, true)) {
            try {
                TestCachingUpstream.Cached<? extends T> cached = ref.get();
                if (needsFetch(cached)) {
                    if (pending.compareAndSet(false, true)) {
                        Downstream<? super T> downstream = waiting.poll();

                        System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);

                        if (downstream == null) {
                            pending.set(false);
                        } else {
                            try {
                                yield(downstream);
                            } catch (Throwable e) {
                                System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
                                receiveResult(downstream, ExecResult.of(Result.error(e)));
                            }
                        }
                    }
                } else {
                    System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
                    Downstream<? super T> downstream = waiting.poll();
                    while (downstream != null) {
                        downstream.accept(cached.result);
                        downstream = waiting.poll();
                    }
                }
            } finally {
                draining.set(false);
            }
        }

        if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
            tryDrain();
        }
    }

    private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
        return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
    }

    private void yield(final Downstream<? super T> downstream) throws Exception {
        System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
        upstream.connect(new Downstream<T>() {
            public void error(Throwable throwable) {
                System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
                receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
            }

            @Override
            public void success(T value) {
                System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
                receiveResult(downstream, ExecResult.of(Result.success(value)));
            }

            @Override
            public void complete() {
                System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
                receiveResult(downstream, CompleteExecResult.get());
            }
        });
    }

    @Override
    public void connect(Downstream<? super T> downstream) throws Exception {
        TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
        if (needsFetch(cached)) {
            Promise.<T>async(d -> {
                waiting.add(d);
                tryDrain();
            }).result(downstream::accept);
        } else {
            downstream.accept(cached.result);
        }
    }

    private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
        Duration ttl = Duration.ofSeconds(0);
        try {
            ttl = ttlFunc.apply(result);
        } catch (Throwable e) {
            if (result.isError()) {
                //noinspection ThrowableResultOfMethodCallIgnored
                result.getThrowable().addSuppressed(e);
            } else {
                result = ExecResult.of(Result.error(e));
            }
        }

        Instant expiresAt;
        if (ttl.isNegative()) {
            expiresAt = null; // eternal
            System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
            upstream = null; // release
        } else if (ttl.isZero()) {
            expiresAt = clock.instant().minus(Duration.ofSeconds(1));
        } else {
            expiresAt = clock.instant().plus(ttl);
        }

        ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
        pending.set(false);

        downstream.accept(result);

        tryDrain();
    }

    static class Cached<T> {
        final ExecResult<T> result;
        final Instant expireAt;

        Cached(ExecResult<T> result, Instant expireAt) {
            this.result = result;
            this.expireAt = expireAt;
        }
    }
}

我已将 for 循环中的步骤数从 25 减少到 3,以使控制台输出更简洁。

成功测试执行(无竞争条件)

让我们看一下正确执行的流程是什么样的:

[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] no pending execution and downstream is not null and cached is null...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] calling yield...  
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream.connect.success
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream does not need fetching...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] no pending execution and downstream is not null and cached is null...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] calling yield...  
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream.connect.success
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream does not need fetching...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] no pending execution and downstream is not null and cached is null...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] calling yield...  
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream.connect.success
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream does not need fetching...

正如您所看到的,每次迭代都会导致缓存的 Promise 产生 5 个控制台日志行。

  • tryDrain方法第一次被调用时,没有缓存结果,它会归结为yield(downstream);方法调用
  • 调用yield(downstream)成功完成并receiveResult(downstream, ExecResult.of(Result.success(value)));从回调内部success调用
  • Promise.cache()通过使用负持续时间来使用无限到期日期,这就是receiveResult()方法通过将对象的值设置为来释放upstream对象的原因null
  • receiveResult()完成之前的方法使用内部对象设置缓存结果reftryDrain(),并在退出方法之前调用。
  • tryDrain()方法在缓存的 promise ( ) 上看到下一次调用之前缓存的结果p.map(v -> v + 2),因此它将缓存的结果直接传递给下游。

对于在 for 循环中创建的所有 3 个 Promise,这种情况会重复。

测试执行失败(竞争条件)

运行测试时System.out.printf()使测试失败的次数减少了几次,主要是因为此 I/O 操作消耗了一些 CPU 周期,而去同步的部分代码有更多的周期来避免竞争条件。但是它仍然会发生,现在让我们看看失败测试的输出是什么样的:

[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] calling yield...  
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] upstream.connect.success
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling yield... upstream is null... 
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] no pending execution and downstream is not null and cached is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] calling yield...  
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream.connect.success
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream does not need fetching...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] no pending execution and downstream is not null and cached is null...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] calling yield...  
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream.connect.success
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream does not need fetching...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling receiveResult after catching exception class java.lang.NullPointerException
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] releasing upstream... (ExecResult{complete=false, error=java.lang.NullPointerException, value=null}) 

java.lang.NullPointerException
    at app.AnotherPromiseTest$TestCachingUpstream.yield(AnotherPromiseTest.java:120)
    at app.AnotherPromiseTest$TestCachingUpstream.tryDrain(AnotherPromiseTest.java:89)
    at app.AnotherPromiseTest$TestCachingUpstream.lambda$connect$0(AnotherPromiseTest.java:146)
    at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
    at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
    at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
    at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
    at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
    at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

这是失败测试的输出 - 我在 IntelliJ IDEA 中运行它,并且我已将这个测试的执行配置为重复执行直到失败。我花了一些时间才让这个测试失败,但是在运行这个测试几次之后,它最终在迭代号 1500 附近失败了。在这种情况下,我们可以看到竞争条件发生在 for 循环中创建的第一个 Promise 上。您可以看到在receiveResult()方法内部释放上游对象后

[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 

并在退出方法之前调用tryDrain,缓存承诺的下一次执行还没有看到先前缓存的结果,它yield(downstream)再次运行到该方法。在upstream对象已通过将其值设置为 释放后null。并且yield(downstream)方法期望上游对象被正确初始化,否则它会抛出 NPE。

我试图调试方法:

private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
    return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}

这是决定是否需要获取缓存的承诺的方法。但是,当我添加任何日志记录语句时,它开始导致StackOverflowError. 我猜在极少数情况下cached.expireAt.isBefore(clock.instant())返回false,因为cached对象来自AtomicReference所以这个对象应该在方法执行之间正确传递。

这是我在实验中使用的完整测试类:

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.internal.PlatformDependent;
import org.junit.Test;
import ratpack.exec.*;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.util.ParallelBatch;
import ratpack.func.Function;
import ratpack.func.Predicate;
import ratpack.test.exec.ExecHarness;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class AnotherPromiseTest {

    @Test
    public void foo() throws Exception {
        List<Promise<Integer>> promises = new ArrayList<>();

        for (int i = 0; i < 3; i++) {
            Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
                    continuation.resume(() -> down.success(12))
            )) {
                @Override
                public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
                    return transform(up -> {
                        return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
                    });
                }
            };

            p = p.cache();
            promises.add(p.map(v -> v + 1));
            promises.add(p.map(v -> v + 2));
        }

        ExecHarness.yieldSingle(c -> ParallelBatch.of(promises).yield()).getValueOrThrow();
    }

    private static class TestCachingUpstream<T> implements Upstream<T> {
        private final String id = UUID.randomUUID().toString();

        private Upstream<? extends T> upstream;

        private final Clock clock;
        private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
        private final Function<? super ExecResult<T>, Duration> ttlFunc;

        private final AtomicBoolean pending = new AtomicBoolean();
        private final AtomicBoolean draining = new AtomicBoolean();
        private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

        public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
            this(upstream, ttl, Clock.systemUTC());
        }

        @VisibleForTesting
        TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
            this.upstream = upstream;
            this.ttlFunc = ttl;
            this.clock = clock;
        }

        private void tryDrain() {
            if (draining.compareAndSet(false, true)) {
                try {
                    TestCachingUpstream.Cached<? extends T> cached = ref.get();
                    if (needsFetch(cached)) {
                        if (pending.compareAndSet(false, true)) {
                            Downstream<? super T> downstream = waiting.poll();

                            System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);

                            if (downstream == null) {
                                pending.set(false);
                            } else {
                                try {
                                    yield(downstream);
                                } catch (Throwable e) {
                                    System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
                                    receiveResult(downstream, ExecResult.of(Result.error(e)));
                                }
                            }
                        }
                    } else {
                        System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
                        Downstream<? super T> downstream = waiting.poll();
                        while (downstream != null) {
                            downstream.accept(cached.result);
                            downstream = waiting.poll();
                        }
                    }
                } finally {
                    draining.set(false);
                }
            }

            if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
                tryDrain();
            }
        }

        private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
            return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
        }

        private void yield(final Downstream<? super T> downstream) throws Exception {
            System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
            upstream.connect(new Downstream<T>() {
                public void error(Throwable throwable) {
                    System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
                }

                @Override
                public void success(T value) {
                    System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, ExecResult.of(Result.success(value)));
                }

                @Override
                public void complete() {
                    System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, CompleteExecResult.get());
                }
            });
        }

        @Override
        public void connect(Downstream<? super T> downstream) throws Exception {
            TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
            if (needsFetch(cached)) {
                Promise.<T>async(d -> {
                    waiting.add(d);
                    tryDrain();
                }).result(downstream::accept);
            } else {
                downstream.accept(cached.result);
            }
        }

        private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
            Duration ttl = Duration.ofSeconds(0);
            try {
                ttl = ttlFunc.apply(result);
            } catch (Throwable e) {
                if (result.isError()) {
                    //noinspection ThrowableResultOfMethodCallIgnored
                    result.getThrowable().addSuppressed(e);
                } else {
                    result = ExecResult.of(Result.error(e));
                }
            }

            Instant expiresAt;
            if (ttl.isNegative()) {
                expiresAt = null; // eternal
                System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
                upstream = null; // release
            } else if (ttl.isZero()) {
                expiresAt = clock.instant().minus(Duration.ofSeconds(1));
            } else {
                expiresAt = clock.instant().plus(ttl);
            }

            ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
            pending.set(false);

            downstream.accept(result);

            tryDrain();
        }

        static class Cached<T> {
            final ExecResult<T> result;
            final Instant expireAt;

            Cached(ExecResult<T> result, Instant expireAt) {
                this.result = result;
                this.expireAt = expireAt;
            }
        }
    }
}

希望能帮助到你。

于 2018-06-14T10:55:12.310 回答