我正在使用的工具需要遍历目录结构,过滤掉某些文件类型并对其执行某些操作(与树遍历相比需要相当长的时间)。自然地,我实现了一个 Callable 接口并使用运行时定义的分配数量的线程异步运行工作负载。
我的询问是关于以下几个方面的:
- 为了学习一些有趣的东西,如何重写以下工作代码以使用 CompletableFuture ?
- 看来我的线程计时器执行错误。它只显示给定工作线程的最后经过时间。我将如何优雅地解决这个问题?
这是工作代码:
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.*;
public class DirWalkTest {
@Test
public void DirWalkNonBlocking() {
final String directory = "./test/pdfWalker";
class myCallable implements Callable<Boolean>{
private final File file;
public myCallable(File f) {
this.file = f;
}
public Boolean call() throws Exception {
initTimerNS("DirWalkNonBlocking-" + Thread.currentThread().getName());
StringBuffer sb = new StringBuffer();
Formatter formatter = new Formatter(sb);
formatter.format("Callback [%s] invoked: %s%n", Thread.currentThread().getName(), file.toString());
try {
TimeUnit.MILLISECONDS.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
formatter.format("Callback [%s] finished: %s%n", Thread.currentThread().getName(), file.toString());
//This seems to be synchronised as opposed to the individual System.out.printf()
System.out.printf("%s", sb.toString());
getElapsedTimeNS("DirWalkNonBlocking-" + Thread.currentThread().getName());
return true;
}
}
//TODO: CompletableFuture is the new kid on the block ...
// http://www.nurkiewicz.com/2013/05/java-8-definitive-guide-to.html
ExecutorService executor = Executors.newWorkStealingPool();
System.out.printf("Running thread pool with %s threads%n", Runtime.getRuntime().availableProcessors());
initTimerNS("overall");
try {
Files.walk(Paths.get(directory)).
filter(Files::isRegularFile).
filter(p -> p.getFileName().toString().toLowerCase().endsWith(".pdf")).
forEach(p -> executor.submit(new myCallable(p.toFile())));
} catch (IOException e) {
e.printStackTrace();
}
try {
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("DirWalkNonBlocking() completed in %.2fms%n", getElapsedTimeNS("overall") / 10E5);
for (Map.Entry<String, Long> entry : end.entrySet()) {
String key = entry.getKey();
if (key.startsWith("DirWalkNonBlocking-")) {
System.out.printf("Run: thread=%s elapsed=%.2fms%n", key, (end.get(key) - start.get(key)) / 10E5);
}
}
}
public static Map<String, Long> start = new ConcurrentHashMap<>();
public static Map<String, Long> end = new ConcurrentHashMap<>();
public static void initTimerNS(String key) {
start.put(key, System.nanoTime());
}
public static double getElapsedTimeNS(String key) {
end.put(key, System.nanoTime());
return end.get(key) - start.get(key);
}
}
以下结构用于对此进行测试(在 ./test/pdfWalker 下):
$ find . -printf "type=%y: file=%p\n" | sort
type=d: file=.
type=d: file=./1
type=d: file=./1/2
type=d: file=./1/2/3
type=d: file=./1/dir.pdf
type=f: file=./1/2/3/iwke
type=f: file=./1/2/3/j.pdf
type=f: file=./1/2/g.pdf
type=f: file=./1/9843ii.pdf
type=f: file=./1/ijj
type=f: file=./1/kk.pdf
测试代码的输出如下:
Running thread pool with 8 threads
Callback [ForkJoinPool-1-worker-3] invoked: ./test/pdfWalker/1.pdf
Callback [ForkJoinPool-1-worker-3] finished: ./test/pdfWalker/1.pdf
Callback [ForkJoinPool-1-worker-5] invoked: ./test/pdfWalker/1/9843ii.pdf
Callback [ForkJoinPool-1-worker-5] finished: ./test/pdfWalker/1/9843ii.pdf
Callback [ForkJoinPool-1-worker-1] invoked: ./test/pdfWalker/3.pdf
Callback [ForkJoinPool-1-worker-1] finished: ./test/pdfWalker/3.pdf
Callback [ForkJoinPool-1-worker-7] invoked: ./test/pdfWalker/1/2/3/j.pdf
Callback [ForkJoinPool-1-worker-7] finished: ./test/pdfWalker/1/2/3/j.pdf
Callback [ForkJoinPool-1-worker-6] invoked: ./test/pdfWalker/1/2/g.pdf
Callback [ForkJoinPool-1-worker-6] finished: ./test/pdfWalker/1/2/g.pdf
Callback [ForkJoinPool-1-worker-4] invoked: ./test/pdfWalker/1/kk.pdf
Callback [ForkJoinPool-1-worker-4] finished: ./test/pdfWalker/1/kk.pdf
Callback [ForkJoinPool-1-worker-2] invoked: ./test/pdfWalker/2.pdf
Callback [ForkJoinPool-1-worker-2] finished: ./test/pdfWalker/2.pdf
Callback [ForkJoinPool-1-worker-0] invoked: ./test/pdfWalker/4.pdf
Callback [ForkJoinPool-1-worker-0] finished: ./test/pdfWalker/4.pdf
Callback [ForkJoinPool-1-worker-1] invoked: ./test/pdfWalker/7.pdf
Callback [ForkJoinPool-1-worker-1] finished: ./test/pdfWalker/7.pdf
Callback [ForkJoinPool-1-worker-7] invoked: ./test/pdfWalker/8.pdf
Callback [ForkJoinPool-1-worker-7] finished: ./test/pdfWalker/8.pdf
Callback [ForkJoinPool-1-worker-3] invoked: ./test/pdfWalker/5.pdf
Callback [ForkJoinPool-1-worker-3] finished: ./test/pdfWalker/5.pdf
Callback [ForkJoinPool-1-worker-5] invoked: ./test/pdfWalker/6.pdf
Callback [ForkJoinPool-1-worker-5] finished: ./test/pdfWalker/6.pdf
DirWalkNonBlocking() completed in 3131.57ms
Run: thread=DirWalkNonBlocking-ForkJoinPool-1-worker-7 elapsed=1501.86ms
Run: thread=DirWalkNonBlocking-ForkJoinPool-1-worker-6 elapsed=1504.18ms
Run: thread=DirWalkNonBlocking-ForkJoinPool-1-worker-5 elapsed=1502.50ms
Run: thread=DirWalkNonBlocking-ForkJoinPool-1-worker-4 elapsed=1502.93ms
Run: thread=DirWalkNonBlocking-ForkJoinPool-1-worker-3 elapsed=1502.35ms
Run: thread=DirWalkNonBlocking-ForkJoinPool-1-worker-2 elapsed=1502.06ms
Run: thread=DirWalkNonBlocking-ForkJoinPool-1-worker-1 elapsed=1501.74ms
Run: thread=DirWalkNonBlocking-ForkJoinPool-1-worker-0 elapsed=1501.29ms