使用 Rx Java,可以通过多种方式解决需求,同时坚持使用 JDK 中的 DirectoryStream。
以下组合将为您提供所需的效果,我将按顺序对其进行解释:
方法 1。使用 flatMap() 和 defer() 运算符的递归方法
方法 2。使用 flatMap() 和 fromCallable 运算符的递归方法
注意:如果将flatMap()的用法替换为concatMap(),则目录树导航必然会以深度优先搜索 (DFS) 方式发生。使用 flatMap(),无法保证 DFS 效果。
方法一:使用 flatMap() 和 defer()
private Observable<Path> recursiveFileSystemNavigation_Using_Defer(Path dir) {
return Observable.<Path>defer(() -> {
//
// try-resource block
//
try(DirectoryStream<Path> children = Files.newDirectoryStream(dir))
{
//This intermediate storage is required because DirectoryStream can't be navigated more than once.
List<Path> subfolders = Observable.<Path>fromIterable(children)
.toList()
.blockingGet();
return Observable.<Path>fromIterable(subfolders)
/* Line X */ .flatMap(p -> !isFolder(p) ? Observable.<Path> just(p) : recursiveFileSystemNavigation_Using_Defer(p), Runtime.getRuntime().availableProcessors());
// /* Line Y */ .concatMap(p -> !isFolder(p) ? Observable.<Path> just(p) : recursiveFileSystemNavigation_Using_Defer(p));
} catch (IOException e) {
/*
This catch block is required even though DirectoryStream is Closeable
resource. Reason is that .close() call on a DirectoryStream throws a
checked exception.
*/
return Observable.<Path>empty();
}
});
}
这种方法是找到给定目录的孩子,然后将孩子作为 Observable 发出。如果一个孩子是一个文件,它将立即可供订阅者使用,否则第 X 行的 flatMap()将调用该方法,递归地将每个子目录作为参数传递。对于每个这样的子目录,flatmap 将同时在内部订阅它们的子目录。这就像需要控制的连锁反应。
因此,使用Runtime.getRuntime().availableProcessors()设置 flatmap() 的最大并发级别,并防止它同时订阅所有子文件夹。如果不设置并发级别,想象一下当一个文件夹有 1000 个子文件夹时会发生什么。
使用defer()可以防止过早地创建 DirectoryStream 并确保它仅在真正订阅以查找其子文件夹时才会发生。
最后,该方法返回一个Observable < Path >以便客户端可以订阅并对结果做一些有用的事情,如下所示:
//
// Using the defer() based approach
//
recursiveDirNavigation.recursiveFileSystemNavigation_Using_Defer(startingDir)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.subscribe(p -> System.out.println(p.toUri()));
使用 defer() 的缺点是,如果它的参数函数抛出一个已检查的异常,它就不能很好地处理已检查的异常。因此,即使DirectoryStream(实现 Closeable)是在 try-resource 块中创建的,我们仍然必须捕获IOException,因为 DirectoryStream 的自动关闭会抛出该检查异常。
在使用基于 Rx 的样式时,使用 catch() 块进行错误处理听起来有点奇怪,因为在反应式编程中偶数错误会作为事件发送。那么我们为什么不使用一个将此类错误作为事件公开的运算符。
在Rx Java 2.x中添加了一个名为fromCallable()的更好的替代方法。第二种方法显示了它的使用。
方法 2. 使用 flatMap() 和 fromCallable 运算符
这种方法使用fromCallable()运算符,它将Callable作为参数。由于我们需要一种递归方法,因此该可调用对象的预期结果是给定文件夹的子对象的 Observable。由于我们希望订阅者在可用时接收结果,因此我们需要从该方法返回一个 Observable。由于内部可调用的结果是一个 Observable 子级列表,因此最终效果是一个 Observable 的 Observable。
private Observable<Observable<Path>> recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(Path dir) {
/*
* fromCallable() takes a Callable argument. In this case the callbale's return value itself is
* a list of sub-paths therefore the overall return value of this method is Observable<Observable<Path>>
*
* While subscribing the final results, we'd flatten this return value.
*
* Benefit of using fromCallable() is that it elegantly catches the checked exceptions thrown
* during the callable's call and exposes that via onError() operator chain if you need.
*
* Defer() operator does not give that flexibility and you have to explicitly catch and handle appropriately.
*/
return Observable.<Observable<Path>> fromCallable(() -> traverse(dir))
.onErrorReturnItem(Observable.<Path>empty());
}
private Observable<Path> traverse(Path dir) throws IOException {
//
// try-resource block
//
try(DirectoryStream<Path> children = Files.newDirectoryStream(dir))
{
//This intermediate storage is required because DirectoryStream can't be navigated more than once.
List<Path> subfolders = Observable.<Path>fromIterable(children)
.toList()
.blockingGet();
return Observable.<Path>fromIterable(subfolders)
/* Line X */ .flatMap(p -> ( !isFolder(p) ? Observable.<Path> just(p) : recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(p).blockingSingle())
,Runtime.getRuntime().availableProcessors());
// /* Line Y */ .concatMap(p -> ( !isFolder(p) ? Observable.<Path> just(p) : recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(p).blockingSingle() ));
}
}
然后,订阅者将需要展平结果流,如下所示:
//
// Using the fromCallable() based approach
//
recursiveDirNavigation.recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(startingDir)
.subscribeOn(Schedulers.io())
.flatMap(p -> p)
.observeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.subscribe(filePath -> System.out.println(filePath.toUri()));
在 traverse() 方法中,为什么第 X 行使用阻塞 Get
因为递归函数返回一个 Observable<Observable>,但是该行的 flatmap 需要一个 Observable 来订阅。
两种方法中的 Y 行都使用 concatMap()
因为如果我们不希望在 flatmap() 进行的内部订阅期间出现并行性,那么 concatMap() 可以很方便地使用。
在这两种方法中,方法isFolder的实现如下所示:
private boolean isFolder(Path p){
if(p.toFile().isFile()){
return false;
}
return true;
}
Java RX 2.0 的 Maven 坐标
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.3</version>
</dependency>
Java 文件中的导入
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;