2

假设我们需要Flux基于Closeable资源的内容创建一个。为清楚起见,说有一个BufferedReader要转换为Flux<String>.

BufferedReader reader = createReader("my_resource_path");
Flux<String> flux = Flux.fromIterable(() -> iteratorOfLines(reader));

让我们假设iteratorOfLines产生有限的项目集。

我正在寻找一种方法来关闭BufferedReader当它Flux消耗了所有数据或由于某种原因不需要剩余数据时(即订阅被中止)。

有一个构造函数reactor.core.publisher.FluxIterable(Iterable iterable, Runnable onClose),但是:

  1. 似乎无法从反应器的公共 API 访问(甚至是传递性的)
  2. 我怀疑它是否有帮助,因为它不包括 Flux 在获取可迭代的最后一项之前停止的情况。

Flux.fromIterable发布最后一个项目后清理/关闭资源的正确方法是什么?

可能有比做fromIterable类似事情更好的方法,所以欢迎所有选择。

4

1 回答 1

2

对于等效于尝试使用的资源,您可以使用using

    Flux.using(
            //Set up resource
            () -> createReader("my_resource_path"),
            //Create flux from resource
            reader -> Flux.fromIterable(iteratorOfLines(reader)),
            //Perform action (cleanup/close) 
            //when resource completes/errors/cancelled
            reader -> {
                try{
                    reader.close();
                }catch(IOException e){
                    throw Exceptions.propagate(e);
                }
            }
    );
于 2020-06-09T07:06:14.747 回答