1

我创建了一个并发的递归目录遍历和文件处理程序,它有时会在所有并行计算完成后挂起,但“主”线程永远不会继续执行其他任务。

代码基本上是一个 fork-join 风格的并发聚合器,并行聚合完成后,它应该在 Swing 窗口中显示结果。聚合的问题在于它需要生成一棵树并将叶子节点的统计信息向上聚合到层次结构中。

我确定我犯了并发错误,但找不到。我在帖子末尾包含了我的代码的相关部分(为简洁起见,删除了代码注释,对 150 行感到抱歉,如果需要,我可以将其移至外部位置)。

上下文:Java 6u13、Windows XP SP3、Core 2 Duo CPU。

我的问题是:

这种随机挂起的原因可能是什么?

是否有更好的方法来进行并发目录遍历,也许是以已经存在的库的形式?

Doug Lea(或 Java 7)的 fork-join 框架会是一个更好的聚合/目录遍历框架,如果是这样,我应该如何重新考虑我的实现 - 在概念级别?

感谢您的时间。

和代码摘录:

private static JavaFileEvaluator[] processFiles(File[] files) 
throws InterruptedException {
    CountUpDown count = new CountUpDown();
    ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors
    .newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length];
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        jfes[i] = new JavaFileEvaluator(files[i], count, ex);
        ex.execute(jfes[i]);
    }
    count.await();
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        final JavaFileEvaluator jfe = jfes[i];
        ex.execute(new Runnable() {
            public void run() {
                jfe.aggregate();
            }
        });

    }
    // -------------------------------------
    // this await sometimes fails to wake up
    count.await(); // <---------------------
    // -------------------------------------
    ex.shutdown();
    ex.awaitTermination(0, TimeUnit.MILLISECONDS);
    return jfes;
}
public class JavaFileEvaluator implements Runnable {
    private final File srcFile;
    private final Counters counters = new Counters();
    private final CountUpDown count;
    private final ExecutorService service;
    private List<JavaFileEvaluator> children;
    public JavaFileEvaluator(File srcFile, 
            CountUpDown count, ExecutorService service) {
        this.srcFile = srcFile;
        this.count = count;
        this.service = service;
    }
    public void run() {
        try {
            if (srcFile.isFile()) {
                JavaSourceFactory jsf = new JavaSourceFactory();
                JavaParser jp = new JavaParser(jsf);
                try {
                    counters.add(Constants.FILE_SIZE, srcFile.length());
                    countLines();
                    jp.parse(srcFile);
                    Iterator<?> it = jsf.getJavaSources();
                    while (it.hasNext()) {
                        JavaSource js = (JavaSource)it.next();
                        js.toString();
                        processSource(js);
                    }
                // Some catch clauses here
                }
            } else
            if (srcFile.isDirectory()) {
                processDirectory(srcFile);
            }
        } finally {
            count.decrement();
        }
    }
    public void processSource(JavaSource js) {
        // process source, left out for brevity
    }
    public void processDirectory(File dir) {
        File[] files = dir.listFiles(new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return 
                (pathname.isDirectory() && !pathname.getName().startsWith("CVS") 
                 && !pathname.getName().startsWith("."))
                || (pathname.isFile() && pathname.getName().endsWith(".java") 
                 && pathname.canRead());
            }
        });
        if (files != null) {
            Arrays.sort(files, new Comparator<File>() {
                @Override
                public int compare(File o1, File o2) {
                    if (o1.isDirectory() && o2.isFile()) {
                        return -1;
                    } else
                    if (o1.isFile() && o2.isDirectory()) {
                        return 1;
                    }
                    return o1.getName().compareTo(o2.getName());
                }
            });
            for (File f : files) {
                if (f.isFile()) {
                    counters.add(Constants.FILE, 1);
                } else {
                    counters.add(Constants.DIR, 1);
                }
                JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service);
                if (children == null) {
                    children = new ArrayList<JavaFileEvaluator>();
                }
                children.add(ev);
                count.increment();
                service.execute(ev);
            }
        }
    }
    public Counters getCounters() {
        return counters;
    }
    public boolean hasChildren() {
        return children != null && children.size() > 0;
    }
    public void aggregate() {
        // recursively aggregate non-leaf nodes
        if (!hasChildren()) {
            count.decrement();
            return;
        }
        for (final JavaFileEvaluator e : children) {
            count.increment();
            service.execute(new Runnable() {
                @Override
                public void run() {
                    e.aggregate();
                }
            });
        }
        count.decrement();
    }
}
public class CountUpDown {
    private final Lock lock = new ReentrantLock();
    private final Condition cond = lock.newCondition();
    private final AtomicInteger count = new AtomicInteger();
    public void increment() {
        count.incrementAndGet();
    }
    public void decrement() {
        int value = count.decrementAndGet();
        if (value == 0) {
            lock.lock();
            try {
                cond.signalAll();
            } finally {
                lock.unlock();
            }
        } else
        if (value < 0) {
            throw new IllegalStateException("Counter < 0 :" + value);
        }
    }
    public void await() throws InterruptedException {
        lock.lock();
        try {
            if (count.get() > 0) {
                cond.await();
            }
        } finally {
            lock.unlock();
        }
    }
}

编辑在 JavaSourceEvaluator 中添加了 hasChildren() 方法。

4

1 回答 1

1

在 JavaFileEvaluator 的聚合方法中,count.decrement() 不会在 finally 块中调用。如果在聚合函数中抛出任何 RuntimeExceptions(可能在 hasChildren 方法中,我没有看到它的主体?),那么对 decrement 的调用将永远不会发生,并且 CountUpDown 将无限期地保持等待。这可能是您看到的随机挂起的原因。

对于第二个问题,我不知道 java 中有任何库可以做到这一点,但我没有真正看过,很抱歉没有回答,但这不是我以前有机会使用的东西。

至于第三个问题,我认为无论是使用别人提供的 fork-join 框架,还是继续提供自己的并发框架,最大的收获是将执行遍历目录工作的逻辑与与管理并行性有关的逻辑。您提供的代码使用 CountUpDown 类来跟踪所有线程何时完成,并且您最终会在处理目录遍历的整个方法中调用递增/递减,这将导致追踪错误的噩梦。迁移到 java7 fork-join 框架将迫使您创建一个仅处理实际遍历逻辑的类,并将并发逻辑留给框架,这可能是您的好方法。另一种选择是继续使用你在这里的东西,

于 2009-06-29T18:06:02.687 回答