2

图书馆:

  1. r2dbc-postgresql-0.8.6.RELEASE
  2. r2dbc-pool-0.8.5.RELEASE
  3. r2dbc-spi-0.8.3.RELEASE
  4. postgresql-42.2.18
  5. 项目清单

问题:我尝试使用 R2DBC (PostgreSQL) 批量插入,代码如下:

@Override
public Flux<Long> test(List<User> users) {
    return Mono.from(connectionFactory.create())
    .flatMapMany(c -> Mono.from(c.beginTransaction())
        .thenMany(Flux.fromIterable(users)
        .map(u -> {
            return Flux.from(c.createStatement("INSERT INTO public.users(name, age, salary) VALUES ($1, $2, $3)").returnGeneratedValues("id")
                .bind(0, u.getName())
                .bind(1, u.getAge())
                .bind(2, u.getSalary()).execute());
        })
        .flatMap(result -> result)
        .map(result -> result.map((row, meta) -> {
            return row.get("id", Long.class);
        }))
        .flatMap(Flux::from)
        .delayUntil(r -> c.commitTransaction())
        .doFinally((st) -> c.close())));
}

该代码将执行语句以将用户插入数据库,然后获取生成的用户 ID。如果用户列表小于或等于 255,则上述代码按预期工作。当用户列表大于 255(256~)时,发生以下异常:

[5b38a8c6-2] There was an unexpected error (type=Internal Server Error, status=500).
Cannot exchange messages because the request queue limit is exceeded
io.r2dbc.postgresql.client.ReactorNettyClient$RequestQueueException: Cannot exchange messages because the request queue limit is exceeded
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Handler xwitch.org.helloworld.rest.v2.CRUDController#importUsersBatchByR2DBC() [DispatcherHandler]
    |_ checkpoint ⇢ springfox.boot.starter.autoconfigure.SwaggerUiWebFluxConfiguration$CustomWebFilter [DefaultWebFilterChain]
    |_ checkpoint ⇢ HTTP GET "/api/v2/users/import-users-batch-by-r2dbc" [ExceptionHandlingWebHandler]
Stack trace:
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:809)
        at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94)
        at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49)
        at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:425)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270)
        at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8147)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
        at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:328)
        at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:345)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:191)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:248)
        at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212)
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
        at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)
        at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784)
        at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)
        at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:719)
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:984)
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860)
        at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767)
        at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118)
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:265)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:371)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

当我尝试调查以检测发生了什么时。我看到异常是由 ReactorNettyClient.java 引发的。实现是:

public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests, Consumer<Flux<FrontendMessage>> sender,
                                                Supplier<Boolean> isConnected) {

        return Flux.create(sink -> {

            Conversation conversation = new Conversation(takeUntil, sink);

            // ensure ordering in which conversations are added to both queues.
            synchronized (this.conversations) {
                if (this.conversations.offer(conversation)) {

                    sink.onRequest(value -> onRequest(conversation, value));

                    if (!isConnected.get()) {
                        sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
                        return;
                    }

                    Flux<FrontendMessage> requestMessages = Flux.from(requests).doOnNext(m -> {
                        if (!isConnected.get()) {
                            sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
                        }
                    });

                    sender.accept(requestMessages);
                } else {
                    sink.error(new RequestQueueException("Cannot exchange messages because the request queue limit is exceeded"));

                }
            }
        });
    }

Queue 超过 255 且 Queue.offer 方法返回 false 时出错。导致异常被抛出。

对不起,我不熟悉英语。请帮我弄清楚发生了什么以及解决它的解决方案。我想批量插入每个请求的记录数> 100000。

谢谢你。

4

0 回答 0