9

我目前正在使用SpringBoot 2、 netty和 jOOQ 上的 spring-boot-starter-webflux开发应用程序。
以下是我经过数小时的研究和 stackoverflow 搜索后得出的代码。我已经建立了很多日志记录,以便查看哪个线程上发生了什么。

用户控制器:

@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
    return Mono.just(user)
            .map(it -> {
                logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
                return it;
            })
            .map(userService::create)
            .map(it -> {
                logger.debug("Sending response on thread: " + Thread.currentThread().getName());
                return ResponseEntity.status(HttpStatus.CREATED).body(it);
            })
            .mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}

用户服务:

public int create(ImUser user) {
    return Mono.just(user)
            .subscribeOn(Schedulers.elastic())
            .map(u -> {
                logger.debug("UserService thread: " + Thread.currentThread().getName());
                return imUserDao.insertUser(u);
            })
            .block();
}

用户道:

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public int insertUser(ImUser user) {
    logger.debug("Insert DB on thread: " + Thread.currentThread().getName());
    return dsl.insertInto(IM_USER,IM_USER.VERSION, IM_USER.FIRST_NAME, IM_USER.LAST_NAME, IM_USER.BIRTHDATE, IM_USER.GENDER)
            .values(1, user.getFirstName(), user.getLastName(), user.getBirthdate(), user.getGender())
            .returning(IM_USER.ID)
            .fetchOne()
            .getId();
}

代码按预期工作,“接收请求”和“发送响应”都在同一个线程(reactor-http-server-epoll-x)上运行,而阻塞代码(对imUserDao.insertUser(u)的调用)运行在弹性调度程序线程(elastic-x)。事务绑定到调用带注释的方法的线程(即 elastic-x),因此按预期工作(我已经用不同的方法测试了它,这里没有发布,以保持简单)。

这是一个日志示例

20:57:21,384 DEBUG         admin.UserController| Receiving request on thread: reactor-http-server-epoll-7
20:57:21,387 DEBUG            admin.UserService| UserService thread: elastic-2
20:57:21,391 DEBUG        admin.ExtendedUserDao| Insert DB on thread: elastic-2
20:57:21,393 DEBUG         tools.LoggerListener| Executing query          
...
20:57:21,401 DEBUG              tools.StopWatch| Finishing                : Total: 9.355ms, +3.355ms
20:57:21,409 DEBUG         admin.UserController| Sending response on thread: reactor-http-server-epoll-7

我研究响应式编程很长时间了,但从来没有真正编写过任何响应式编程。现在我是,我想知道我是否做得正确。所以这是我的问题:

1.上面的代码是处理传入的HTTP请求,查询DB然后响应的好方法吗?为了我的理智,请忽略我内置的 logger.debug(...) 调用 :) 我有点期望有一个Flux< ImUser>作为控制器方法的参数,从某种意义上说,我有多个潜在请求的流,这些请求将在某个时间点出现,并且都将以相同的方式处理。相反,我发现的示例创建了Mono.from(...); 每次有请求进来。

2.在 UserService 中创建的第二个 Mono ( Mono.just(user) ) 感觉有些别扭。我知道我需要启动一个新流才能在弹性调度程序上运行代码,但是没有操作员可以做到这一点吗?

3.从代码的编写方式,我了解到UserService中的Mono会被阻塞,直到DB操作完成,但服务请求的原始流没有被阻塞。它是否正确?

4.我打算将Schedulers.elastic()替换为可以配置工作线程数量的并行调度程序。这个想法是最大工作线程数应该与最大数据库连接数相同。当 Scheduler 内的所有工作线程都忙时会发生什么?那是背压突然出现的时候吗?

5.我最初希望在我的控制器中包含以下代码:

return userService.create(user)
            .map(it -> ResponseEntity.status(HttpStatus.CREATED).body(it))
            .mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));

但我无法做到这一点并让事情在正确的线程中运行。有没有办法在我的代码中实现这一点?

任何帮助将不胜感激。谢谢!

4

1 回答 1

6

服务和控制器
你的服务被阻塞的事实是有问题的,因为在控制器中你从 a 内部调用一个阻塞方法map,它没有在单独的线程上移动。这有可能阻止所有控制器。

你可以做的是返回 a Monofrom UserService#create(删除block()最后的)。由于服务保证了 Dao 方法调用是隔离的,所以问题较少。从那里开始,无需Mono.just(user)在 Controller 中执行任何操作:只需调用 create 并直接在生成的 Mono 上开始链接操作符:

@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
    //this log as you saw was executed in the same thread as the controller method
    logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
    return userService.create(user)
        .map(it -> {
            logger.debug("Sending response on thread: " + Thread.currentThread().getName());
            return ResponseEntity.status(HttpStatus.CREATED).body(it);
        })
        .mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}

记录
请注意,如果您想记录某些内容,有几个比执行 amap和返回更好的选择it

  • doOnNext方法为此量身定制:对一个反应信号做出反应(在这种情况下,onNext发出一个值)并执行一些非变异操作,使输出序列与源序列完全相同。doOn 的“副作用”可以是写入控制台或增加统计计数器……还有 doOnComplete、doOnError、doOnSubscribe、doOnCancel 等……

  • log只需按上面的顺序记录所有事件。它将检测您是否使用 SLF4J,如果是,则在 DEBUG 级别使用配置的记录器。否则它将使用 JDK 日志记录功能(因此您还需要对其进行配置以显示调试级别日志)。

在反应式编程中,关于事务或者更确切地说是任何依赖于ThreadLocal
ThreadLocal 和线程粘性的东西可能会出现问题,因为在整个序列中底层执行模型保持不变的保证较少。AFlux可以分几个步骤执行,每个步骤都在不同的Scheduler(以及线程或线程池中)。即使在特定步骤,一个值也可以由底层线程池的线程 A 处理,而稍后到达的下一个值将在线程 B 上处理。

在这种情况下,依赖 Thread Local 并不那么简单,我们目前正在积极致力于提供更适合响应式世界的替代方案。

您创建一个连接池大小的池的想法很好,但不一定足够,因为事务通量使用多个线程的潜力,因此可能会污染事务的一些线程。

当池用完线程时会发生什么
如果你使用一个特定Scheduler的来隔离像这里这样的阻塞行为,一旦它用完线程,它会抛出一个RejectedExecutionException.

于 2017-03-31T08:09:21.220 回答