1

我试图在反应式编程中获得从 redis 读取的执行时间,在查找文档时我能够看到该elapsed()方法将执行相同的操作并实现如下代码。

Flux.fromIterable(getActions(httpHeaders))
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(actionFact -> methodToReadFromCache(actionFact))
                .sequential();

public Mono<ActionFact> methodToReadFromCache(actionFact) {
    return Mono.fromCallable(() -> getKey(actionFact))
                .flatMap(cacheKey ->
                  redisOperations.hasKey(key)
                                .flatMap(aBoolean -> {
                                    if (aBoolean) {
                                        return redisOperations.opsForValue().get(cacheKey);
                                    }
                                    return authzService.getRolePermissions(actionFact)
                                            .flatMap(policySetResponse ->
                                                    //save in cache
                                            );
                                })
                                .elapsed()
                                .flatMap(lambda -> {
                                    LOG.info("cache/service processing key:{}, time:{}", key, lambda.getT1());
                                    return Mono.just(lambda.getT2());
                                });

输出:

cache/service processing key:KEY1, time:3 
cache/service processing key:KEY2, time:4 
cache/service processing key:KEY3, time:18 
cache/service processing key:KEY4, time:34 
cache/service processing key:KEY5, time:46 
cache/service processing key:KEY6, time:57 
cache/service processing key:KEY7, time:70 
cache/service processing key:KEY8, time:81 
cache/service processing key:KEY9, time:91 
cache/service processing key:KEY10, time:103
cache/service processing key:KEY11, time:112
cache/service processing key:KEY12, time:121
cache/service processing key:KEY13, time:134
cache/service processing key:KEY14, time:146
cache/service processing key:KEY15, time:159

我预计每个缓存请求所花费的时间将小于 5 毫秒,就像第一个和第二个请求一样,但情况并非如此。是否elapsed()将当前获取时间添加到累积中?根据我的理解,从通量发出的每个项目都是独立的?

4

2 回答 2

1

Mono#elapsed测量从Mono订阅到发布Mono项目之间的时间(onNext)。

在您的情况下,导致订阅和计时器启动的原因是flatMap调用methodToReadFromCache.

是什么导致 onNext 以及因此计时的是hasKeyif/else 部分(redisOperations.opsForValue().get(cacheKey)vs authzService)的组合。

由于我们处于并行模式,因此外部 flatMap 的计时器数量应至少与 CPU 数量一样多。

但是,时间偏差的事实可能暗示了某些事情正在阻塞或容量有限的事实。例如,会不会是 redisTemplate 一次只能处理几个键?

于 2019-07-12T12:49:47.453 回答
0

根据文档

我想将排放与(Tuple2<Long, T>)测量的时间联系起来……​</p>

  • 自订阅以来:已过

  • 从时间的黎明开始(嗯,计算机时间):时间戳

elapsed是自订阅以来测量的时间。因此,您订阅并开始发送,自您订阅服务以来的时间会增加。

官方文档

于 2019-07-10T12:53:42.200 回答