我目前正在使用SpringBoot 2、 netty和 jOOQ 上的 spring-boot-starter-webflux开发应用程序。
以下是我经过数小时的研究和 stackoverflow 搜索后得出的代码。我已经建立了很多日志记录,以便查看哪个线程上发生了什么。
用户控制器:
@RequestMapping(value = "/user", method = RequestMethod.POST)
public Mono<ResponseEntity<Integer>> createUser(@RequestBody ImUser user) {
return Mono.just(user)
.map(it -> {
logger.debug("Receiving request on thread: " + Thread.currentThread().getName());
return it;
})
.map(userService::create)
.map(it -> {
logger.debug("Sending response on thread: " + Thread.currentThread().getName());
return ResponseEntity.status(HttpStatus.CREATED).body(it);
})
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
}
用户服务:
public int create(ImUser user) {
return Mono.just(user)
.subscribeOn(Schedulers.elastic())
.map(u -> {
logger.debug("UserService thread: " + Thread.currentThread().getName());
return imUserDao.insertUser(u);
})
.block();
}
用户道:
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)
public int insertUser(ImUser user) {
logger.debug("Insert DB on thread: " + Thread.currentThread().getName());
return dsl.insertInto(IM_USER,IM_USER.VERSION, IM_USER.FIRST_NAME, IM_USER.LAST_NAME, IM_USER.BIRTHDATE, IM_USER.GENDER)
.values(1, user.getFirstName(), user.getLastName(), user.getBirthdate(), user.getGender())
.returning(IM_USER.ID)
.fetchOne()
.getId();
}
代码按预期工作,“接收请求”和“发送响应”都在同一个线程(reactor-http-server-epoll-x)上运行,而阻塞代码(对imUserDao.insertUser(u)的调用)运行在弹性调度程序线程(elastic-x)。事务绑定到调用带注释的方法的线程(即 elastic-x),因此按预期工作(我已经用不同的方法测试了它,这里没有发布,以保持简单)。
这是一个日志示例:
20:57:21,384 DEBUG admin.UserController| Receiving request on thread: reactor-http-server-epoll-7
20:57:21,387 DEBUG admin.UserService| UserService thread: elastic-2
20:57:21,391 DEBUG admin.ExtendedUserDao| Insert DB on thread: elastic-2
20:57:21,393 DEBUG tools.LoggerListener| Executing query
...
20:57:21,401 DEBUG tools.StopWatch| Finishing : Total: 9.355ms, +3.355ms
20:57:21,409 DEBUG admin.UserController| Sending response on thread: reactor-http-server-epoll-7
我研究响应式编程很长时间了,但从来没有真正编写过任何响应式编程。现在我是,我想知道我是否做得正确。所以这是我的问题:
1.上面的代码是处理传入的HTTP请求,查询DB然后响应的好方法吗?为了我的理智,请忽略我内置的 logger.debug(...) 调用 :) 我有点期望有一个Flux< ImUser>作为控制器方法的参数,从某种意义上说,我有多个潜在请求的流,这些请求将在某个时间点出现,并且都将以相同的方式处理。相反,我发现的示例创建了Mono.from(...); 每次有请求进来。
2.在 UserService 中创建的第二个 Mono ( Mono.just(user) ) 感觉有些别扭。我知道我需要启动一个新流才能在弹性调度程序上运行代码,但是没有操作员可以做到这一点吗?
3.从代码的编写方式,我了解到UserService中的Mono会被阻塞,直到DB操作完成,但服务请求的原始流没有被阻塞。它是否正确?
4.我打算将Schedulers.elastic()替换为可以配置工作线程数量的并行调度程序。这个想法是最大工作线程数应该与最大数据库连接数相同。当 Scheduler 内的所有工作线程都忙时会发生什么?那是背压突然出现的时候吗?
5.我最初希望在我的控制器中包含以下代码:
return userService.create(user)
.map(it -> ResponseEntity.status(HttpStatus.CREATED).body(it))
.mapError(DuplicateKeyException.class, e -> new SomeSpecialException(e.getMessage(), e));
但我无法做到这一点并让事情在正确的线程中运行。有没有办法在我的代码中实现这一点?
任何帮助将不胜感激。谢谢!