我认为您的用例的基本构建块是“锁存器”,类似于 CountDownLatch,但与 CountDownLatch 不同,它也允许增加计数。
这种闩锁的接口可能是
public interface Latch {
public void countDown();
public void countUp();
public void await() throws InterruptedException;
public int getCount();
}
计数的合法值为 0 及以上。await() 方法会让你阻塞直到计数降到零。
如果你有这样的闩锁,你的用例可以很容易地描述。我还怀疑可以在此解决方案中消除队列(边界)(执行器无论如何都提供了一个,因此有点多余)。我会将您的主要例程重写为
ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();
您的 URLCrawler 将以这种方式使用闩锁:
public class URLCrawler implements Runnable {
private final Latch latch;
public URLCrawler(..., Latch l) {
...
latch = l;
latch.countUp(); // increment the count as early as possible
}
public void run() {
try {
List<URL> secondaryUrls = crawl();
for (URL url: secondaryUrls) {
// submit new tasks directly
executor.execute(new URLCrawler(..., latch));
}
} finally {
// as a last step, decrement the count
latch.countDown();
}
}
}
至于闩锁实现,可能有多种实现,从基于 wait() 和 notifyAll() 的实现,一种使用 Lock 和 Condition 的实现,到使用 AbstractQueuedSynchronizer 的实现。我认为所有这些实现都非常简单。请注意,wait()-notifyAll() 版本和 Lock-Condition 版本将基于互斥,而 AQS 版本将使用 CAS(比较和交换),因此在某些情况下可能会更好地扩展。