1

我正在尝试创建一个流,其中通量发出 10 个项目,每个项目并行,每个项目休眠 1 秒。由于每个项目都在单独的线程上发布,我希望整个过程需要 1 秒。但是日志显示它需要 10 秒。

我尝试将 subscribeOn 更改为 publishOn,映射到 doOnNext。但它们似乎都不起作用。

我是 Reactor 的新手,正在尝试了解我哪里出错了。非常感激任何的帮助。谢谢

    public void whenIsANewThreadCreated() {
        Flux.range(1,10)
                .publishOn(Schedulers.elastic())
                .map(count -> {
                    logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
                    try {
                        Thread.sleep(1_000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return count;
                })
        .blockLast();
    }
2020-03-30 16:17:29.799  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 10

4

3 回答 3

2

您必须首先通过调用parallel方法创建一个并行通量,并且您必须使用它runOn来实现并行性。

Flux.range(1,10)
    .parallel()
    .runOn(Schedulers.elastic())
    .map(count -> {
        System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return count;
    }).subscribe();

  • 不鼓励使用Schedulers.boundedElastic()as usingScheduler.elastic()
  • parallel默认情况下将根据您的 CPU 内核创建线程。如果您想使用更多线程parallel(10)- 我认为这就是您想要看到的。
于 2020-03-30T20:52:56.283 回答
1

该规范要求onNext事件被串行调用。您map正在有效地将输入onNext事件转换为onNext阻塞 1 秒的事件。根据规范,10 个传入onNext导致一系列 10 个传出onNext,每个阻塞 1 秒 => 10 秒阻塞。

parallel(10).runOn(Scheduler.elastic())如果您想在 10 个平行轨道上分配阻塞工作负载,您绝对 100% 必须使用。(Schedulerfor runOn 也可以是Schedulers.boundedElastic(), 或Schedulers.newParallel(10))。

于 2020-04-02T09:23:26.667 回答
0

参考:https ://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

您可以在后台启动这些进程并获得多线程。这不是并行。在执行 CPU 密集型任务时应该使用并行调度程序,在 I/O 或阻塞操作时使用弹性调度程序。

public void whenIsANewThreadCreated() {
    Flux.range(1,10)
            .subscribeOn(Schedulers.boundedElastic()) // if not, main calling thread will be used
            .flatMap(count -> {
                log.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
                return Mono.fromCallable(() -> method5(count)).subscribeOn(Schedulers.boundedElastic());
            })
            .blockLast();
}


Mono<Integer> method5(int count) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Mono.just(count);
}

你会得到这样的东西

 23:42:33.289 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 1
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 2
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 3
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 4
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 5
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 6
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 7
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 8
 23:42:33.344 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 9
 23:42:33.344 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 10
于 2021-09-21T06:48:46.263 回答