我在反应性世界中很新鲜,最近遇到了很奇怪的问题,上下文:
- 在 MS SQL 上使用带有 R2DBC 的 spring 数据
- 在 SB 2.3、OpenJDK 14 上运行(以下依赖项)
- 通过注释使用事务性
- 使用纯粹基于
DatabaseClient
(下面的代码示例)构建的“存储库”
并在尝试执行时:
- 从 中删除
master.abc.DEF
,然后 - 一个无效的,故意插入我得到以下异常:
2020-06-04 00:46:16.345 ERROR [....] 56223 --- [actor-tcp-nio-2] reactor.core.publisher.Operators : Operator called default onErrorDropped
io.r2dbc.mssql.ExceptionFactory$MssqlNonTransientException: Cannot insert the value NULL into column 'some_id', table 'master.abc.DEF'; column does not allow nulls. INSERT fails.
at io.r2dbc.mssql.ExceptionFactory.createException(ExceptionFactory.java:152)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxLift] :
reactor.core.publisher.Flux.doOnComplete
io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:154)
Error has been observed at the following site(s):
|_ Flux.doOnComplete ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:154)
|_ Flux.filter ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:160)
|_ Flux.doOnCancel ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:161)
|_ Flux.doOnSubscribe ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:163)
Stack trace:
at io.r2dbc.mssql.ExceptionFactory.createException(ExceptionFactory.java:152)
at io.r2dbc.mssql.ExceptionFactory.createException(ExceptionFactory.java:181)
at io.r2dbc.mssql.RpcQueryMessageFlow.lambda$exchange$1(RpcQueryMessageFlow.java:148)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:96)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426)
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:90)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:90)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:90)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:90)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:178)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:90)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:90)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426)
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268)
at io.r2dbc.mssql.client.ReactorNettyClient$1.next(ReactorNettyClient.java:237)
at io.r2dbc.mssql.client.ReactorNettyClient$1.next(ReactorNettyClient.java:197)
at io.r2dbc.mssql.message.token.Tabular$TabularDecoder.decode(Tabular.java:425)
at io.r2dbc.mssql.client.ConnectionState$4$1.decode(ConnectionState.java:206)
at io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:137)
at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:109)
at io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:247)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:220)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:354)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:352)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:402)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:832)
因此 - 事务没有回滚并且基础表被锁定:/
流程(简化)是:
@Transactional
public Mono<X> replaceX(final String someX)
return someRepository.deleteByX(someX)
.then(...making a webClient-based-call)
.flatMap(input -> someRepository.create(detailsFrom(input)))
....
以某种方式添加Hooks.onErrorDropped(error -> ...
可以解决问题,但这似乎不是一个合适的解决方案,我仍然想知道实际的根本原因是什么 - 我假设我的代码/方法而不是 r2dbc。
pom片段
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/>
</parent>
...
<java.version>14</java.version>
<!-- dependencies version -->
<spring-sleuth.version>2.2.2.RELEASE</spring-sleuth.version>
<reactor-tools.version>3.3.5.RELEASE</reactor-tools.version>
<reactor-tools-blockhound.version>1.0.3.RELEASE</reactor-tools-blockhound.version>
<mssql-jdbc.version>7.4.1.jre11</mssql-jdbc.version> <!-- override spring's default -->
<hibernate-validator.version>6.1.5.Final</hibernate-validator.version>
<springdoc-webflux.version>1.3.2</springdoc-webflux.version>
<logstash-logback.version>6.3</logstash-logback.version>
...
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-mssql</artifactId>
<scope>runtime</scope>
</dependency>
存储库方法(概念):
public Mono<Void> deleteByX(final String x) {
return this.databaseClient.delete()
.from(X.class)
.matching(where("x").is(x))
.then();
}
public Mono<Long> create(final @NonNull X x) {
return this.databaseClient.insert()
.into(X.class)
.using(x)
.map(row -> row.get(0, Long.class))
.first();
}