0

我正在使用r2dbc-postgresql驱动程序。假设我们有一个包含 1000 条记录的表,并且fetchSize是 100:

connectionMono.flatMapMany(
                connection -> connection
                        .createStatement("select age from users")
                        .fetchSize(100)
                        .execute())

将执行多少个网络调用?我知道使用 JDBC Statement.SetFetchsize,驱动程序将获取 10 批中的所有行,每批 100 行。

4

1 回答 1

1

查看 r2dbc 驱动程序中的代码,行为是相同的:它按块获取具有指定大小的行,因此在您的情况下为 100。

这是处理ExtendedQueryMessageFlow的方法的代码:

    /**
     * Execute the query and indicate to fetch rows in chunks with the {@link Execute} message.
     *
     * @param bindFlow  the initial bind flow
     * @param client    client to use
     * @param portal    the portal
     * @param fetchSize fetch size per roundtrip
     * @return the resulting message stream
     */
    private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow, Client client, String portal, int fetchSize) {

        DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
        FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
        AtomicBoolean isCanceled = new AtomicBoolean(false);

        return client.exchange(bindFlow.concatWithValues(new CompositeFrontendMessage(new Execute(portal, fetchSize), Flush.INSTANCE)).concatWith(requestsProcessor))
            .handle((BackendMessage message, SynchronousSink<BackendMessage> sink) -> {
                if (message instanceof CommandComplete) {
                    requestsSink.next(new Close(portal, PORTAL));
                    requestsSink.next(Sync.INSTANCE);
                    requestsSink.complete();
                    sink.next(message);
                } else if (message instanceof ErrorResponse) {
                    requestsSink.next(Sync.INSTANCE);
                    requestsSink.complete();
                    sink.next(message);
                } else if (message instanceof PortalSuspended) {
                    if (isCanceled.get()) {
                        requestsSink.next(new Close(portal, PORTAL));
                        requestsSink.next(Sync.INSTANCE);
                        requestsSink.complete();
                    } else {
                        requestsSink.next(new Execute(portal, fetchSize));
                        requestsSink.next(Flush.INSTANCE);
                    }
                } else {
                    sink.next(message);
                }
            })
            .as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
    }
于 2021-02-22T16:15:28.573 回答