我在 Tomcat 上有一个带有自动配置数据源的 Spring Boot 服务。
spring.datasource.url=jdbc:postgresql://localhost:5432/postgres
spring.datasource.driverClassName=org.postgresql.Driver
spring.datasource.username=postgres
spring.datasource.password=example
现在我正在探索响应式访问数据库的可能性。我的connectionFactory配置如下:
@Configuration
public class ReactiveConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
return ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(HOST, "localhost")// optional, defaults to 5432
.option(PORT, 5432)
.option(USER, "postgres")
.option(PASSWORD, "example")
.option(DATABASE, "postgres")
.build());
}
}
我的“道”如下:
@Service
@RequiredArgsConstructor
@Slf4j
public class ReactiveTest {
private final ConnectionFactory cf;
public Flux<Long> findAllId() {
return Flux.from(cf.create()).flatMap(c -> Mono.from(c.beginTransaction()).flatMapMany(cr -> Flux.from(c.createStatement("select kls_id from am_fibre_on_location limit 10 offset 0").execute())
.map(res -> res.map((row, meta) -> row.get("kls_id", Long.class)))).flatMap(Flux::from));
}
}
控制器
@GetMapping(value = "/reactive")
@ApiOperation(nickname = "reactive",
value = "reactive")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "OK"),
@ApiResponse(code = 400, message = "Bad request"),
@ApiResponse(code = 500, message = "Internal Server Error")
})
public List<Long> reactive () {
List<Long> longs = new ArrayList<>();
Disposable dis = reactiveTest.findAllId().subscribe(longs::add);
while (!dis.isDisposed()) {}
return longs;
}
当我在一些虚拟服务(只有两个类和控制器)中使用它时,一切都按预期工作。假人的Pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>1.0.0.M7</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>1.0.0.M7</version>
</dependency>
但是当我尝试在现有服务中使用它时,我得到
{reactor.netty.channel.AbortedException: Cannot publish the contentMonoLift, the connection has been closed
\r\n\tat reactor.netty.FutureMono$DeferredWriteMono.subscribe(FutureMono.java:321)
\r\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:3080)
\r\n\tat reactor.netty.NettyOutbound.subscribe(NettyOutbound.java:317)
\r\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418)
\r\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:241)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:82)
\r\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
\r\n\tat reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:417)
\r\n\tat reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:265)
\r\n\tat reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:568)
\r\n\tat reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:146)
\r\n\tat io.r2dbc.postgresql.client.ReactorNettyClient.lambda$null$16(ReactorNettyClient.java:252)
\r\n\tat reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:82)
\r\n\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1637)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:70)
\r\n\tat reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:89)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:64)
\r\n\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
\r\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46)
\r\n\tat reactor.core.publisher.FluxSourceMono.subscribe(FluxSourceMono.java:46)
\r\n\tat reactor.core.publisher.Flux.subscribe(Flux.java:6873)
\r\n\tat reactor.core.publisher.Flux.subscribeWith(Flux.java:7040)
\r\n\tat reactor.core.publisher.Flux.subscribe(Flux.java:6866)
\r\n\tat reactor.core.publisher.Flux.subscribe(Flux.java:6830)
\r\n\tat reactor.core.publisher.Flux.subscribe(Flux.java:6800)
\r\n\tat io.r2dbc.postgresql.client.ReactorNettyClient.lambda$exchange$17(ReactorNettyClient.java:248)
\r\n\tat reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:53)
\r\n\tat reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46)
\r\n\tat reactor.core.publisher.MonoFlatMapMany.subscribe(MonoFlatMapMany.java:49)
\r\n\tat reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)
\r\n\tat reactor.core.publisher.FluxWindowPredicate.subscribe(FluxWindowPredicate.java:88)
\r\n\tat reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)
\r\n\tat reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62)
\r\n\tat reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)
\r\n\tat reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62)
\r\n\tat reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46)
\r\n\tat reactor.core.publisher.Flux.subscribe(Flux.java:6873)
\r\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:372)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:82)
\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1080)
\r\n\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:206)
\r\n\tat reactor.core.publisher.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:285)
\r\n\tat reactor.core.publisher.Operators.complete(Operators.java:125)
\r\n\tat reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
\r\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:3080)
\r\n\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.subscribeNextTrigger(MonoDelayUntil.java:168)
\r\n\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.onNext(MonoDelayUntil.java:127)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:82)
\r\n\tat reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:82)
\r\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1080)
\r\n\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:206)
\r\n\tat reactor.core.publisher.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:285)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:174)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:174)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:175)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:82)
\r\n\tat reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:140)
\r\n\tat io.r2dbc.postgresql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:143)
\r\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:236)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:245)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.checkTerminated(FluxWindowPredicate.java:466)
\r\n\tat reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drainLoop(FluxWindowPredicate.java:414)
\r\n\tat reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drain(FluxWindowPredicate.java:358)
\r\n\tat reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onComplete(FluxWindowPredicate.java:269)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:174)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:174)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:174)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:174)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:245)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:349)
\r\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onComplete(FluxConcatMap.java:265)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:245)
\r\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onComplete(ScopePassingSpanSubscriber.java:91)
\r\n\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:130)
\r\n\tat reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:378)
\r\n\tat reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:202)
\r\n\tat reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:343)
\r\n\tat reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:325)
\r\n\tat reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:372)
\r\n\tat reactor.netty.channel.ChannelOperations.onInboundClose(ChannelOperations.java:332)
\r\n\tat reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)
\r\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
\r\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
\r\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
\r\n\tat io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
\r\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
\r\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
\r\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
\r\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
\r\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
\r\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
\r\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
\r\n\tat io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
\r\n\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163)
\r\n\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
\r\n\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
\r\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
\r\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
\r\n\tat java.lang.Thread.run(Thread.java:748)
\r\n","message":"Connection Error","source":"io.r2dbc.postgresql.client.ReactorNettyClient"
从那个堆栈跟踪我无法理解这个问题。我看到postgresql甚至没有打开r2dbc-connection(来自pgAdmin,当虚拟打开它时)。jdbc 和 r2dbc 是否以某种方式发生冲突?还是 r2dbc 不能在 tomcat 上工作?还是与 HikariCP 冲突?