尝试获取 SQL Server 连接池时出现错误。
马文:
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework</groupId>
<artifactId>accessing-data-r2dbc</artifactId>
<version>0.1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<name>R2DBC</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.9</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-spring</artifactId>
<version>1.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-mssql</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
<version> 0.8.3.RELEASE </version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
使用 ReactiveCrudRepository:
public class R2DBCConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(ConnectionFactoryOptions.DRIVER, "pool")
.option(ConnectionFactoryOptions.PROTOCOL, "mssql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
.option(ConnectionFactoryOptions.HOST, HOST)
.option(ConnectionFactoryOptions.PORT, PORT)
.option(ConnectionFactoryOptions.USER, user)
.option(ConnectionFactoryOptions.PASSWORD, passwd)
.option(ConnectionFactoryOptions.DATABASE, mydatabase)
.option(MAX_SIZE, 30)
.build());
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMinutes(30))
.initialSize(10)
.maxSize(10)
.maxCreateConnectionTime(Duration.ofSeconds(1))
.build();
return new ConnectionPool(configuration);
}
}
我可以通过 MssqlConnectionFactory 获得连接,但是当我们需要同时更新超过 1000 条记录时我们会发现错误,所以我认为连接池应该更适合我们。
这是测试代码:
@Test
void simpleTest() {
pushMesgApiRepository.findAllPushStatus("4")
.subscribe(
obj -> {
obj.setPUSH_TIME((new java.util.Date()).toInstant()
.atZone(ZoneId.systemDefault()).toLocalDateTime());
pushMesgApiRepository.save(obj)
.subscribe(
elem->{
System.out.println("Register updated: " + elem); },
err->{
System.out.println("Register failed: " + obj);
err.printStackTrace();
});
},
err->err.getSuppressed());
}
这是我得到的错误:
2020-07-28 16:01:59.552 INFO 9648 --- [ main] com.example.demo.DemoApplicationTests : Starting DemoApplicationTests on LAPTOP-NQ3UUEE1 with PID 9648 (started by Banana in D:\MyWorkSpace\R2DBC\R2DBC)
2020-07-28 16:01:59.554 INFO 9648 --- [ main] com.example.demo.DemoApplicationTests : No active profile set, falling back to default profiles: default
2020-07-28 16:01:59.925 INFO 9648 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data R2DBC repositories in DEFAULT mode.
2020-07-28 16:02:00.058 INFO 9648 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 128ms. Found 1 R2DBC repository interfaces.
2020-07-28 16:02:00.654 WARN 9648 --- [ main] io.r2dbc.pool.ConnectionPool : Creating ConnectionPool using another ConnectionPool [ConnectionPool[Microsoft SQL Server]] as ConnectionFactory
2020-07-28 16:02:01.371 INFO 9648 --- [ main] com.example.demo.DemoApplicationTests : Started DemoApplicationTests in 2.15 seconds (JVM running for 3.297)
2020-07-28 16:02:01.808 WARN 9648 --- [extShutdownHook] o.s.b.f.support.DisposableBeanAdapter : Destroy method 'close' on bean with name 'connectionFactory' threw an exception: java.lang.NoSuchMethodError: 'boolean reactor.pool.InstrumentedPool.isDisposed()'
我也尝试通过 DatabaseClient 对象而不是 ReactiveCRUDRepository 来做到这一点
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMinutes(30))
.initialSize(10)
.maxSize(30)
//.maxCreateConnectionTime(Duration.ofSeconds(1))
.build();
client = DatabaseClient.create(new ConnectionPool(configuration));
我收到了更详细的错误消息:(在我看来,错误与依赖项有关)
2020-07-28 16:17:33.785 ERROR 5928 --- [ctor-http-nio-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
java.lang.NoSuchMethodError: 'reactor.core.publisher.Mono reactor.pool.InstrumentedPool.acquire()'
at io.r2dbc.pool.ConnectionPool.lambda$new$6(ConnectionPool.java:96) ~[r2dbc-pool-0.8.3.RELEASE.jar:0.8.3.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:110) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.MonoRetry.subscribeOrReturn(MonoRetry.java:49) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4204) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:185) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:243) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:103) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:148) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.MonoCurrentContext.subscribe(MonoCurrentContext.java:35) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4219) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.MonoUsingWhen.subscribe(MonoUsingWhen.java:96) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4219) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4330) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4190) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4126) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at com.example.demo.DemoApplicationTests.lambda$12(DemoApplicationTests.java:189) ~[test-classes/:na]
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:168) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:385) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:96) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:330) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:152) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:365) ~[reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:368) ~[reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:424) ~[reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:613) ~[reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518) ~[netty-handler-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267) ~[netty-handler-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314) ~[netty-handler-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.50.Final.jar:4.1.50.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.50.Final.jar:4.1.50.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.50.Final.jar:4.1.50.Final]
at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na]