1

我遇到了repeatWhenEmpty操作员的奇怪行为,这对我来说没有任何意义。我在 Github 上上传了一个存储库,其中包含最少的可重现样本:https ://github.com/codependent/rsocket-rating-service 。

考虑这个提供两个端点(HTTP @GetMapping("/requestRating") - WebSocket )的控制器@MessageMapping("request-rating")。注意这个调用.repeatWhenEmpty(Repeat.onlyIf<Any> { true }.backoff(Backoff.fixed(Duration.ofSeconds(1))))generateRating()链的中间:

@RestController
class RatingServiceRestController {

    private val FAIL_RATE = 0
    private val logger = LoggerFactory.getLogger(javaClass)

    @GetMapping("/requestRating")
    fun getRatingHttp(ratingRequest: RatingRequest): Mono<Rating> {
        return generateRating(ratingRequest)
    }

    @MessageMapping("request-rating")
    fun getRatingWebSocket(ratingRequest: RatingRequest): Mono<Rating> {
        return generateRating(ratingRequest)
    }

    private fun generateRating(ratingRequest: RatingRequest): Mono<Rating> {
        return doGenerateRating(ratingRequest)
                .doOnNext {
                    logger.info("Next1 {}", it)
                }
                .doOnCancel {
                    logger.info("Cancel1")
                }
                .doOnSuccess {
                    logger.info("Success1 {}", it)
                }
                .doOnError { throwable ->
                    logger.error("Error1 {}", throwable)
                }
                .doOnTerminate {
                    logger.info("Terminate1")
                }
                .repeatWhenEmpty(Repeat.onlyIf<Any> { true }.backoff(Backoff.fixed(Duration.ofSeconds(1))))
                .doOnNext {
                    logger.info("Next2 {}", it)
                }
                .doOnCancel {
                    logger.info("Cancel2")
                }
                .doOnSuccess {
                    logger.info("Success2 {}", it)
                }
                .doOnError { throwable ->
                    logger.error("Error2 {}", throwable)
                }
                .doOnTerminate {
                    logger.info("Terminate2")
                }
    }

    private fun doGenerateRating(ratingRequest: RatingRequest): Mono<Rating> {
        return Mono.defer {
            val random = (0..100).random()
            if (random <= FAIL_RATE) {
                Mono.empty()
            } else {
                Mono.just(Rating(ratingRequest.songId, (0..10).random()))
            }
        }
    }
}

启动应用程序后,我可以调用http://localhost:8080/requestRating?songId=1234它并返回一个结果,按预期显示这些日志:

RatingServiceRestController    : Next1 Rating(songId=1234, value=1)
RatingServiceRestController    : Success1 Rating(songId=1234, value=1)
RatingServiceRestController    : Terminate1
RatingServiceRestController    : Cancel1
RatingServiceRestController    : Next2 Rating(songId=1234, value=1)
RatingServiceRestController    : Success2 Rating(songId=1234, value=1)
RatingServiceRestController    : Terminate2

当我从 Websocket 调用相同的逻辑时:

  1. 使用权http://localhost:8080/index.html
  2. 填写任何字符串并推送发送

奇怪的是,这些是我看到的所有日志:

RatingServiceRestController    : Next1 Rating(songId=asfdasf, value=2)
RatingServiceRestController    : Success1 Rating(songId=asfdasf, value=2)
RatingServiceRestController    : Terminate1

大约三分钟后,出现了:

RatingServiceRestController    : Cancel2
RatingServiceRestController    : Cancel1

尽管已经生成了一个如 中所示的元素Next1,但没有调用doOnXXXafter 的运算符。repeatWhenEmpty客户显然也没有得到结果。

这里发生了什么?如何repeatWhenEmpty在 RSocket websocket 的上下文中使用?

更新:

我添加了一个log()运算符来获取更多信息。

HTTP 请求:

10:37:01.957  INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1                     : onSubscribe(MonoNext.NextSubscriber)
10:37:01.959  INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1                     : request(unbounded)
10:37:01.967  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Next1 Rating(songId=1234, value=0)
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Success1 Rating(songId=1234, value=0)
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Terminate1
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Cancel1
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1                     : onNext(Rating(songId=1234, value=0))
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Next2 Rating(songId=1234, value=0)
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Success2 Rating(songId=1234, value=0)
10:37:01.968  INFO 5202 --- [ctor-http-nio-2] c.c.r.r.c.RatingServiceRestController    : Terminate2
10:37:01.976  INFO 5202 --- [ctor-http-nio-2] reactor.Mono.Defer.1                     : onComplete()

RSocket 请求:

10:37:29.143  INFO 5202 --- [ctor-http-nio-6] reactor.Mono.Defer.2                     : onContextUpdate(Context1{reactor.onDiscard.local=reactor.core.publisher.Operators$$Lambda$720/0x0000000800597c40@6dfdbeee})
10:37:29.143  INFO 5202 --- [ctor-http-nio-6] reactor.Mono.Defer.2                     : onSubscribe(MonoNext.NextSubscriber)
10:37:29.143  INFO 5202 --- [ctor-http-nio-6] reactor.Mono.Defer.2                     : request(1)
10:37:29.143  INFO 5202 --- [ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController    : Next1 Rating(songId=asdf, value=0)
10:37:29.143  INFO 5202 --- [ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController    : Success1 Rating(songId=asdf, value=0)
10:37:29.143  INFO 5202 --- [ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController    : Terminate1

三分钟后:

10:40:27.802  INFO 5202 --- [     parallel-1] c.c.r.r.c.RatingServiceRestController    : Cancel2
10:40:27.802  INFO 5202 --- [     parallel-1] reactor.Mono.Defer.2                     : cancel()
10:40:27.802  INFO 5202 --- [     parallel-1] c.c.r.r.c.RatingServiceRestController    : Cancel1

如您所见,存在一些差异

  1. 在 RSocket 请求中有一个onContextUpdate调用。
  2. RSocket 请求 1 个元素,HTTP 无界
  3. 尽管发射了一个元素(onNext),但 RSocket 执行似乎正在重试或做某事。CPU 在做一些工作时停留在 6%,这在 HTTP 调用中没有发生,如下图所示:

在此处输入图像描述

更新 2:

我一直在调试,并在第一次取消期间发现了这种执行差异,特别是在课堂上Operators

    public static void onDiscardMultiple(@Nullable Iterator<?> multiple, boolean knownToBeFinite, Context context) {
        if (multiple == null) return;
        if (!knownToBeFinite) return;

        Consumer<Object> hook = context.getOrDefault(Hooks.KEY_ON_DISCARD, null);
        if (hook != null) {
            try {
                multiple.forEachRemaining(o -> {
                    if (o != null) {
                        try {
                            hook.accept(o);
                        }
                        catch (Throwable t) {
                            log.warn("Error while discarding element from an Iterator, continuing with next element", t);
                        }
                    }
                });
            }
            catch (Throwable t) {
                log.warn("Error while discarding Iterator, stopping", t);
            }
        }
    }

HTTP:

语境:

在此处输入图像描述

Consumer<Object> hook = context.getOrDefault(Hooks.KEY_ON_DISCARD, null);

hook为空,所以它不执行:

if (hook != null) {
            try {
                multiple.forEachRemaining(o -> {...

RSocket:

语境:

在此处输入图像描述

在这种情况下,钩子是:

在此处输入图像描述

forEachRemaining它在块中无限循环:

    if (hook != null) {
            try {
                multiple.forEachRemaining(o -> {
                    if (o != null) {
                        try {
                            hook.accept(o);
                        }
                        catch (Throwable t) {
                            log.warn("Error while discarding element from an Iterator, continuing with next element", t);
                        }
                    }
                });
            }
4

1 回答 1

2

https://github.com/rsocket/rsocket-java/issues/860所示,Project Reactor 中似乎存在问题。

我按照建议更改为以下操作员,从而解决了问题:

.repeatWhenEmpty(30) { longFlux ->
      longFlux.delayElements(Duration.ofSeconds(1))
             .doOnNext { logger.info("Repeating {}", it) }
}
于 2020-06-04T21:26:13.797 回答