2

我希望服务器的所有调用都将并行处理,但事实并非如此。这是一个简单的例子。

RSocket 版本:1.1.0

服务器

public class ServerApp {
    private static final Logger log = LoggerFactory.getLogger(ServerApp.class);

    public static void main(String[] args) throws InterruptedException {
        RSocketServer.create(SocketAcceptor.forRequestResponse(payload ->
                Mono.fromCallable(() -> {
                    log.debug("Start of my business logic");
                    sleepSeconds(5);
                    return DefaultPayload.create("OK");
                })))
                .bind(WebsocketServerTransport.create(15000))
                .block();
        log.debug("Server started");
        TimeUnit.MINUTES.sleep(30);
    }

    private static void sleepSeconds(int sec) {
        try {
            TimeUnit.SECONDS.sleep(sec);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

客户

public class ClientApp {
    private static final Logger log = LoggerFactory.getLogger(ClientApp.class);

    public static void main(String[] args) throws InterruptedException {
        RSocket client = RSocketConnector.create()
                .connect(WebsocketClientTransport.create(15000))
                .block();

        long start1 = System.currentTimeMillis();
        client.requestResponse(DefaultPayload.create("Request 1"))
                .doOnNext(r -> log.debug("finished within {}ms", System.currentTimeMillis() - start1))
                .subscribe();

        long start2 = System.currentTimeMillis();
        client.requestResponse(DefaultPayload.create("Request 2"))
                .doOnNext(r -> log.debug("finished within {}ms", System.currentTimeMillis() - start2))
                .subscribe();

        TimeUnit.SECONDS.sleep(20);
    }
}

在客户端日志中,我们可以看到两个请求同时发送,并且在 10 秒后同时收到两个响应(每个请求在 5 秒内进行)。

在服务器日志中,我们可以看到请求是按顺序执行的,而不是并行执行的。

你能帮我理解这种行为吗?

  1. 为什么我们在 10 秒后收到第一个响应,而不是 5 秒?
  2. 如果我希望并行处理所有请求,如何正确创建服务器?

如果我替换Mono.fromCallableMono.fromFuture(CompletableFuture.supplyAsync(() -> myBusinessLogic(), executorService)),那么它将解决 1。

如果我替换Mono.fromCallableMono.delay(Duration.ZERO).map(ignore -> myBusinessLogic(),那么它将解决 1. 和 2。

如果我替换Mono.fromCallableMono.create(sink -> sink.success(myBusinessLogic())),那么它不会解决我的问题。

客户端日志:

2021-07-16 10:39:46,880 DEBUG [reactor-tcp-nio-1] [/] - sending -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 56
Data:

2021-07-16 10:39:46,952 DEBUG [main] [/] - sending -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 31                      |Request 1       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:46,957 DEBUG [main] [/] - sending -> 
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 32                      |Request 2       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,043 DEBUG [reactor-tcp-nio-1] [/] - receiving -> 
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - finished within 10120ms
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - receiving -> 
Frame => Stream ID: 3 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - finished within 10094ms

服务器日志:

2021-07-16 10:39:46,965 DEBUG [reactor-http-nio-2] [/] - receiving -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 56
Data:

2021-07-16 10:39:47,021 DEBUG [reactor-http-nio-2] [/] - receiving -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 31                      |Request 1       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:47,027 DEBUG [reactor-http-nio-2] [/] - Start of my business logic
2021-07-16 10:39:52,037 DEBUG [reactor-http-nio-2] [/] - sending -> 
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:52,038 DEBUG [reactor-http-nio-2] [/] - receiving -> 
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 32                      |Request 2       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:52,038 DEBUG [reactor-http-nio-2] [/] - Start of my business logic
2021-07-16 10:39:57,039 DEBUG [reactor-http-nio-2] [/] - sending -> 
Frame => Stream ID: 3 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+
4

1 回答 1

1

您不应该将诸如 Reactive Mono 操作之类的异步代码与诸如

    private static void sleepSeconds(int sec) {
        try {
            TimeUnit.SECONDS.sleep(sec);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

我怀疑这里的中心问题是像 rsocket-java 这样的框架不想在新线程上运行所有东西,代价是过度的上下文切换。因此,通常依赖于您适当地运行长时间运行的 CPU 或 IO 操作。

您应该查看各种异步延迟操作https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#delayElement-java.time.Duration-

如果您的延迟是为了模拟长时间运行的操作,那么您应该查看订阅不同的调度程序,例如https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html#boundedElastic --

于 2021-07-17T09:09:18.220 回答