0

我使用 Spring 反应式作为服务器来进行昂贵的生成并在 Flux 中一一返回结果。如果请求被取消(例如约束和太紧),这具有停止生成的优点。我的代码如下所示:

    public Flux<Entity> generate(int nbrOfEntitiesToGenerate, Constaints constraints) {
        return Flux.range(0, nbrOfEntitiesToGenerate)
            .map(x -> Generator.expensiveGeneration(constraints)
//            .subscribeOn(Schedulers.parallel())
            ;
    }

这只完成了我想要的一半,取消时我不会拨打下一个电话expensiveGeneration,但不会停止当前运行昂贵的生成,如果约束太紧,可能永远不会完成。请问我该怎么做。

额外的问题,如果你知道,我怎样才能在并行中生成 x 个实体以最大限度地利用我的线程(当然不是一次启动所有代)。

提前致谢。

4

1 回答 1

0

Scheduler从 a创建 a很简单,但如果要取消,则ExecutorService需要保存可调用对象。Future<?>我将其更改Generator为保存并包装该cancel方法,该方法在Flux处理doOnCancel.

public class FluxPlay {
    public static void main(String[] args) {
        new FluxPlay().run();
    }
    private void run() {
        Flux<LocalDateTime> f = generate(10);
        Disposable d = f.subscribeOn(Schedulers.single()).subscribe(System.out::println);
        try {
            Thread.sleep(4500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        d.dispose();
    }

    private Flux<LocalDateTime> generate(int nbrOfEntitiesToGenerate) {
        Generator generator = new Generator();
        return Flux.range(0, nbrOfEntitiesToGenerate)
        .map(x -> generator.expensiveGeneration())
        .doOnCancel(generator::cancel)
        .doFinally(generator::shutdown)
        .publishOn(Schedulers.fromExecutor(generator::submit));
    }
}

和:

public class Generator {
    Future<?> f;
    ExecutorService es = Executors.newSingleThreadExecutor();
    public void submit(Runnable command) {
      f = es.submit(command);
    }
    public void cancel() {
        f.cancel(true);
    }
    public void shutdown(SignalType st) {
        es.shutdown();
    }
    public LocalDateTime expensiveGeneration() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
        return LocalDateTime.now();
    }
}
于 2020-03-12T10:01:00.617 回答