1

尝试获取 SQL Server 连接池时出现错误。

马文:

<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework</groupId>
<artifactId>accessing-data-r2dbc</artifactId>
<version>0.1.0</version>
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<name>R2DBC</name>
<description>Demo project for Spring Boot</description>

<properties>
    <java.version>1.9</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.projectreactor</groupId>
        <artifactId>reactor-spring</artifactId>
        <version>1.0.1.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-r2dbc</artifactId>          
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-mssql</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
      <groupId>io.r2dbc</groupId>
      <artifactId>r2dbc-pool</artifactId>
      <version> 0.8.3.RELEASE </version>
    </dependency>
    
    
    <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>provided</scope>
    </dependency>
    
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

</project>

使用 ReactiveCrudRepository:

public class R2DBCConfiguration extends AbstractR2dbcConfiguration {
    @Bean
    @Override
    public ConnectionFactory  connectionFactory() {
        ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
                   .option(ConnectionFactoryOptions.DRIVER, "pool")
                   .option(ConnectionFactoryOptions.PROTOCOL, "mssql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
                   .option(ConnectionFactoryOptions.HOST, HOST)
                   .option(ConnectionFactoryOptions.PORT, PORT) 
                   .option(ConnectionFactoryOptions.USER, user)
                   .option(ConnectionFactoryOptions.PASSWORD, passwd)
                   .option(ConnectionFactoryOptions.DATABASE, mydatabase)
                   .option(MAX_SIZE, 30)                   
                   .build());
         ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
                    .maxIdleTime(Duration.ofMinutes(30))
                    .initialSize(10)
                    .maxSize(10)
                    .maxCreateConnectionTime(Duration.ofSeconds(1))
                    .build();
        
        
        return new ConnectionPool(configuration);
    }

}

我可以通过 MssqlConnectionFactory 获得连接,但是当我们需要同时更新超过 1000 条记录时我们会发现错误,所以我认为连接池应该更适合我们。

这是测试代码:

@Test
    void simpleTest() {
        pushMesgApiRepository.findAllPushStatus("4")
        .subscribe(
            obj -> {
                obj.setPUSH_TIME((new java.util.Date()).toInstant()
                                      .atZone(ZoneId.systemDefault()).toLocalDateTime());
                            
                pushMesgApiRepository.save(obj)
                .subscribe(
                        elem->{
                            System.out.println("Register updated: " + elem);                        },
                        err->{
                            System.out.println("Register failed: "  + obj);
                            err.printStackTrace();
                        });
            },
            err->err.getSuppressed());
        
        
        }

这是我得到的错误:

2020-07-28 16:01:59.552  INFO 9648 --- [           main] com.example.demo.DemoApplicationTests    : Starting DemoApplicationTests on LAPTOP-NQ3UUEE1 with PID 9648 (started by Banana in D:\MyWorkSpace\R2DBC\R2DBC)
2020-07-28 16:01:59.554  INFO 9648 --- [           main] com.example.demo.DemoApplicationTests    : No active profile set, falling back to default profiles: default
2020-07-28 16:01:59.925  INFO 9648 --- [           main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data R2DBC repositories in DEFAULT mode.
2020-07-28 16:02:00.058  INFO 9648 --- [           main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 128ms. Found 1 R2DBC repository interfaces.
2020-07-28 16:02:00.654  WARN 9648 --- [           main] io.r2dbc.pool.ConnectionPool             : Creating ConnectionPool using another ConnectionPool [ConnectionPool[Microsoft SQL Server]] as ConnectionFactory
2020-07-28 16:02:01.371  INFO 9648 --- [           main] com.example.demo.DemoApplicationTests    : Started DemoApplicationTests in 2.15 seconds (JVM running for 3.297)
2020-07-28 16:02:01.808  WARN 9648 --- [extShutdownHook] o.s.b.f.support.DisposableBeanAdapter    : Destroy method 'close' on bean with name 'connectionFactory' threw an exception: java.lang.NoSuchMethodError: 'boolean reactor.pool.InstrumentedPool.isDisposed()'

我也尝试通过 DatabaseClient 对象而不是 ReactiveCRUDRepository 来做到这一点

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
                    .maxIdleTime(Duration.ofMinutes(30))
                    .initialSize(10)
                    .maxSize(30)
                    //.maxCreateConnectionTime(Duration.ofSeconds(1))
                    .build();
        
        
        client = DatabaseClient.create(new ConnectionPool(configuration));

我收到了更详细的错误消息:(在我看来,错误与依赖项有关)

2020-07-28 16:17:33.785 ERROR 5928 --- [ctor-http-nio-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

java.lang.NoSuchMethodError: 'reactor.core.publisher.Mono reactor.pool.InstrumentedPool.acquire()'
    at io.r2dbc.pool.ConnectionPool.lambda$new$6(ConnectionPool.java:96) ~[r2dbc-pool-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:110) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.MonoRetry.subscribeOrReturn(MonoRetry.java:49) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4204) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:185) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:243) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:103) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:148) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.MonoCurrentContext.subscribe(MonoCurrentContext.java:35) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4219) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.MonoUsingWhen.subscribe(MonoUsingWhen.java:96) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4219) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4330) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4190) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4126) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at com.example.demo.DemoApplicationTests.lambda$12(DemoApplicationTests.java:189) ~[test-classes/:na]
    at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:168) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:385) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:96) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:330) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:152) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:365) ~[reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:368) ~[reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
    at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:424) ~[reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:613) ~[reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518) ~[netty-handler-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267) ~[netty-handler-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314) ~[netty-handler-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-codec-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.50.Final.jar:4.1.50.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.50.Final.jar:4.1.50.Final]
    at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na]
4

0 回答 0