假设我们需要Flux
基于Closeable
资源的内容创建一个。为清楚起见,说有一个BufferedReader
要转换为Flux<String>
.
BufferedReader reader = createReader("my_resource_path");
Flux<String> flux = Flux.fromIterable(() -> iteratorOfLines(reader));
让我们假设iteratorOfLines
产生有限的项目集。
我正在寻找一种方法来关闭BufferedReader
当它Flux
消耗了所有数据或由于某种原因不需要剩余数据时(即订阅被中止)。
有一个构造函数reactor.core.publisher.FluxIterable(Iterable iterable, Runnable onClose)
,但是:
- 似乎无法从反应器的公共 API 访问(甚至是传递性的)
- 我怀疑它是否有帮助,因为它不包括 Flux 在获取可迭代的最后一项之前停止的情况。
Flux.fromIterable
发布最后一个项目后清理/关闭资源的正确方法是什么?
可能有比做fromIterable
类似事情更好的方法,所以欢迎所有选择。