我希望服务器的所有调用都将并行处理,但事实并非如此。这是一个简单的例子。
RSocket 版本:1.1.0
服务器
public class ServerApp {
private static final Logger log = LoggerFactory.getLogger(ServerApp.class);
public static void main(String[] args) throws InterruptedException {
RSocketServer.create(SocketAcceptor.forRequestResponse(payload ->
Mono.fromCallable(() -> {
log.debug("Start of my business logic");
sleepSeconds(5);
return DefaultPayload.create("OK");
})))
.bind(WebsocketServerTransport.create(15000))
.block();
log.debug("Server started");
TimeUnit.MINUTES.sleep(30);
}
private static void sleepSeconds(int sec) {
try {
TimeUnit.SECONDS.sleep(sec);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
客户
public class ClientApp {
private static final Logger log = LoggerFactory.getLogger(ClientApp.class);
public static void main(String[] args) throws InterruptedException {
RSocket client = RSocketConnector.create()
.connect(WebsocketClientTransport.create(15000))
.block();
long start1 = System.currentTimeMillis();
client.requestResponse(DefaultPayload.create("Request 1"))
.doOnNext(r -> log.debug("finished within {}ms", System.currentTimeMillis() - start1))
.subscribe();
long start2 = System.currentTimeMillis();
client.requestResponse(DefaultPayload.create("Request 2"))
.doOnNext(r -> log.debug("finished within {}ms", System.currentTimeMillis() - start2))
.subscribe();
TimeUnit.SECONDS.sleep(20);
}
}
在客户端日志中,我们可以看到两个请求同时发送,并且在 10 秒后同时收到两个响应(每个请求在 5 秒内进行)。
在服务器日志中,我们可以看到请求是按顺序执行的,而不是并行执行的。
你能帮我理解这种行为吗?
- 为什么我们在 10 秒后收到第一个响应,而不是 5 秒?
- 如果我希望并行处理所有请求,如何正确创建服务器?
如果我替换Mono.fromCallable
为Mono.fromFuture(CompletableFuture.supplyAsync(() -> myBusinessLogic(), executorService))
,那么它将解决 1。
如果我替换Mono.fromCallable
为Mono.delay(Duration.ZERO).map(ignore -> myBusinessLogic()
,那么它将解决 1. 和 2。
如果我替换Mono.fromCallable
为Mono.create(sink -> sink.success(myBusinessLogic()))
,那么它不会解决我的问题。
客户端日志:
2021-07-16 10:39:46,880 DEBUG [reactor-tcp-nio-1] [/] - sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 56
Data:
2021-07-16 10:39:46,952 DEBUG [main] [/] - sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 31 |Request 1 |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:46,957 DEBUG [main] [/] - sending ->
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 32 |Request 2 |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,043 DEBUG [reactor-tcp-nio-1] [/] - receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b |OK |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - finished within 10120ms
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - receiving ->
Frame => Stream ID: 3 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b |OK |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - finished within 10094ms
服务器日志:
2021-07-16 10:39:46,965 DEBUG [reactor-http-nio-2] [/] - receiving ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 56
Data:
2021-07-16 10:39:47,021 DEBUG [reactor-http-nio-2] [/] - receiving ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 31 |Request 1 |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:47,027 DEBUG [reactor-http-nio-2] [/] - Start of my business logic
2021-07-16 10:39:52,037 DEBUG [reactor-http-nio-2] [/] - sending ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b |OK |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:52,038 DEBUG [reactor-http-nio-2] [/] - receiving ->
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 32 |Request 2 |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:52,038 DEBUG [reactor-http-nio-2] [/] - Start of my business logic
2021-07-16 10:39:57,039 DEBUG [reactor-http-nio-2] [/] - sending ->
Frame => Stream ID: 3 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b |OK |
+--------+-------------------------------------------------+----------------+