我有一个生成器类,它拥有一个线程,其中确定了要生成的许多“记录”,然后生成那么多记录(这些记录被放置在 BlockingQueue 中以供另一个线程检索)。
我希望另一个线程知道将生成多少条记录(用于合理的进度报告等)。
似乎 Future 为我提供了我所追求的接口,但我是 Java 新手,不确定实现它的惯用方式。
我的背景是 C++/Win32,所以我通常会使用 win32“事件”(由CreateEvent(0, true, false, 0)
、SetEvent
和WaitForSingleObject
为我的信号和等待实现创建)。我注意到 Java 有一个CountDownLatch
,但不知何故这感觉比我所追求的要重(有点类似于在我真正想要一个布尔值时使用 int ),并且为此目的似乎不直观(无论如何对我来说)。
所以这是我使用 CountDownLatch 和 Future 的代码。我在这里稍微提炼了我的真实代码(删除了不相关的实现细节并忽略了所有错误处理)。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public abstract class Generator {
private CountDownLatch numRecordsSignal = new CountDownLatch(1);
private int numRecords;
private BlockingQueue<Record> queue = new LinkedBlockingQueue<Record>();
public Generator() {
new Thread(new Runnable() {
@Override
public void run() {
numRecords = calculateNumRecords();
numRecordsSignal.countDown();
for (Record r : generateRecords()) {
try {
queue.put(r);
} catch (InterruptedException e) {
// [ ... snip ... ]
}
}
}
}).start();
}
public Future<Integer> numRecords() {
return new Future<Integer>() {
// Ignore cancel for now (It wouldn't make sense to cancel
// just this part of the Generator's work, anyway).
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
public Integer get() throws InterruptedException {
numRecordsSignal.await();
return numRecords;
}
public Integer get(long timeout, TimeUnit unit)
throws InterruptedException {
numRecordsSignal.await(timeout, unit);
return numRecords;
}
public boolean isCancelled() {
return false;
}
public boolean isDone() {
// Since we can't cancel, just check the state of the
// signal
return numRecordsSignal.getCount() == 0;
}
};
}
public Record nextRecord() throws InterruptedException {
return queue.take();
}
/** --- Boring stuff below this line --- */
public interface Record { }
protected abstract int calculateNumRecords();
protected abstract Iterable<Record> generateRecords();
}
现在我的实际问题:
- 有没有比
CountDownLatch
单发信号更好的机制? - 我希望调用者能够等待或轮询结果,但不需要他们能够取消操作。Future 是暴露这些东西的正确方法吗?
- 这些东西看起来特别“非Java”吗?我完全走错了吗?
编辑:
澄清一下,我希望调用者能够执行以下操作:
Generator gen = new Generator();
Integer numRecords = gen.numRecords().get(); // This call might block waiting for the result
numRecords = gen.numRecords().get(); // This call will never block, as the result is already available.
这只是我试图实现的一个缓慢初始化的值。一旦满足“初始化”条件,它就应该锁存。一旦知道该值,就不会对其进行重新评估。