查看 r2dbc 驱动程序中的代码,行为是相同的:它按块获取具有指定大小的行,因此在您的情况下为 100。
这是处理ExtendedQueryMessageFlow的方法的代码:
/**
* Execute the query and indicate to fetch rows in chunks with the {@link Execute} message.
*
* @param bindFlow the initial bind flow
* @param client client to use
* @param portal the portal
* @param fetchSize fetch size per roundtrip
* @return the resulting message stream
*/
private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow, Client client, String portal, int fetchSize) {
DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
AtomicBoolean isCanceled = new AtomicBoolean(false);
return client.exchange(bindFlow.concatWithValues(new CompositeFrontendMessage(new Execute(portal, fetchSize), Flush.INSTANCE)).concatWith(requestsProcessor))
.handle((BackendMessage message, SynchronousSink<BackendMessage> sink) -> {
if (message instanceof CommandComplete) {
requestsSink.next(new Close(portal, PORTAL));
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
sink.next(message);
} else if (message instanceof ErrorResponse) {
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
sink.next(message);
} else if (message instanceof PortalSuspended) {
if (isCanceled.get()) {
requestsSink.next(new Close(portal, PORTAL));
requestsSink.next(Sync.INSTANCE);
requestsSink.complete();
} else {
requestsSink.next(new Execute(portal, fetchSize));
requestsSink.next(Flush.INSTANCE);
}
} else {
sink.next(message);
}
})
.as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
}