4

我有一个生成器类,它拥有一个线程,其中确定了要生成的许多“记录”,然后生成那么多记录(这些记录被放置在 BlockingQueue 中以供另一个线程检索)。

我希望另一个线程知道将生成多少条记录(用于合理的进度报告等)。

似乎 Future 为我提供了我所追求的接口,但我是 Java 新手,不确定实现它的惯用方式。

我的背景是 C++/Win32,所以我通常会使用 win32“事件”(由CreateEvent(0, true, false, 0)SetEventWaitForSingleObject为我的信号和等待实现创建)。我注意到 Java 有一个CountDownLatch,但不知何故这感觉比我所追求的要重(有点类似于在我真正想要一个布尔值时使用 int ),并且为此目的似乎不直观(无论如何对我来说)。

所以这是我使用 CountDownLatch 和 Future 的代码。我在这里稍微提炼了我的真实代码(删除了不相关的实现细节并忽略了所有错误处理)。

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;

    public abstract class Generator {

        private CountDownLatch numRecordsSignal = new CountDownLatch(1);

        private int numRecords;

        private BlockingQueue<Record> queue = new LinkedBlockingQueue<Record>();

        public Generator() {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    numRecords = calculateNumRecords();
                    numRecordsSignal.countDown();

                    for (Record r : generateRecords()) {
                        try {
                            queue.put(r);
                        } catch (InterruptedException e) {
                            // [ ... snip ... ]
                        }
                    }                
                }
            }).start();
        }

        public Future<Integer> numRecords() {
            return new Future<Integer>() {
                // Ignore cancel for now (It wouldn't make sense to cancel 
                // just this part of the Generator's work, anyway).
                public boolean cancel(boolean mayInterruptIfRunning) { 
                    return false; 
                }

                public Integer get() throws InterruptedException {
                    numRecordsSignal.await();
                    return numRecords;
                }

                public Integer get(long timeout, TimeUnit unit) 
                        throws InterruptedException {
                    numRecordsSignal.await(timeout, unit);
                    return numRecords;
                }

                public boolean isCancelled() {
                    return false;
                }

                public boolean isDone() {
                    // Since we can't cancel, just check the state of the 
                    // signal
                    return numRecordsSignal.getCount() == 0;
                }
            };
        }

        public Record nextRecord() throws InterruptedException {
            return queue.take();
        }

        /** --- Boring stuff below this line --- */
        public interface Record { }

        protected abstract int calculateNumRecords();

        protected abstract Iterable<Record> generateRecords();                        
    }

现在我的实际问题:

  1. 有没有比CountDownLatch单发信号更好的机制?
  2. 我希望调用者能够等待或轮询结果,但不需要他们能够取消操作。Future 是暴露这些东西的正确方法吗?
  3. 这些东西看起来特别“非Java”吗?我完全走错了吗?

编辑:

澄清一下,我希望调用者能够执行以下操作:

    Generator gen = new Generator();
    Integer numRecords = gen.numRecords().get();  // This call might block waiting for the result
    numRecords = gen.numRecords().get(); // This call will never block, as the result is already available.

这只是我试图实现的一个缓慢初始化的值。一旦满足“初始化”条件,它就应该锁存。一旦知道该值,就不会对其进行重新评估。

4

5 回答 5

3

旁注

您不应该在构造函数中启动线程- 很可能在线程启动时未完全创建 Generator 对象,例如倒计时锁存器很可能为 null。您可以在构造函数中创建线程,但应该以单独的方法启动它。您的调用代码将变为:

Generator g = new Generator();
g.start();

你的问题

您正在自己重新实现 Future,在我看来,这既没有必要也不可取。我会重新设计这个类并通过一个执行器来Generator实现Callable<Integer>和运行它。这为您提供了几件事:

  • 从生成器中删除线程逻辑,这使您能够在调用堆栈的更高级别更有效地管理线程
  • 整数通过调用代码中的未来返回,您依赖 JDK 来处理实现
  • 我假设可以先填充队列然后返回整数
  • 您可以future.get()根据需要多次调用 - 它只会在第一次调用时阻塞。
public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(1);
    Future<Integer> future = executor.submit(new GeneratorImpl()); //a concrete implementation of Generator
    int numRecords = 0;
    try {
        numRecords = future.get(); //you can use a get with timeout here
    } catch (ExecutionException e) {
        //an exception happened in Generator#call()
    } catch (InterruptedException e) {
        //handle it
    }

    //don't forget to call executor.shutdown() when you don't need it any longer
}

public abstract class Generator implements Callable<Integer> {

    private BlockingQueue<Record> queue = new LinkedBlockingQueue<Record>();

    @Override
    public Integer call() {
        int numRecords = calculateNumRecords();
        for (Record r : generateRecords()) {
            try {
                queue.put(r);
            } catch (InterruptedException e) {
                // [ ... snip ... ]
            }
        }
        return numRecords;
    }

    public Record nextRecord() throws InterruptedException {
        return queue.take();
    }

    /**
     * --- Boring stuff below this line ---
     */
    public interface Record {
    }

    protected abstract int calculateNumRecords();

    protected abstract Iterable<Record> generateRecords();
}

编辑

如果您需要尽快返回 numRecods,您可以在单独的线程中填充您的队列:

    public Integer call() {
        int numRecords = calculateNumRecords();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (Record r : generateRecords()) {
                    try {
                        queue.put(r);
                    } catch (InterruptedException e) {
                        // [ ... snip ... ]
                    }
                }
            }
        }).start(); //returns immediately
        return numRecords;
    }
于 2012-08-02T09:50:40.883 回答
1

Java 线程的“WaitOnSingleEvent()”和“SetEvent()”的标准 Java 等效项是“wait()”、“notify()”和“notifyAll()”。

于 2012-08-02T06:25:41.207 回答
1

在查看了实现我自己的信号机制并遵循其他人做同样事情留下的面包屑痕迹之后,我遇到了AbstractQueuedSynchronizer的 javadoc ,其中包括一个“BooleanLatch”的代码片段,它完全满足了我的需求:

class BooleanLatch {
    private static class Sync extends AbstractQueuedSynchronizer {
        boolean isSignalled() { return getState() != 0; }

        protected int tryAcquireShared(int ignore) {
            return isSignalled()? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignore) {
            setState(1);
           return true;
        }
    }

    private final Sync sync = new Sync();
    public boolean isSignalled() { return sync.isSignalled(); }
    public void signal()         { sync.releaseShared(1); }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 }

进行更多搜索后,我发现许多框架都包含 BooleanLatch(Apache Qpid 就是其中之一)。一些实现(例如 Atlassian 的)是自动重置的,这会使它们不适合我的需要。

于 2012-08-05T03:30:11.657 回答
0

如果我正确理解您的问题,标准观察者通知模式在这里会有所帮助。

于 2012-08-02T06:44:39.653 回答
0

对于这种情况下的单发信号,信号量更好,因为它记住了“信号”。条件对象 [wait() is on a condition] 不会记住信号。

 Semaphore numRecordsUpdated = new Semaphore(0);

在生成器中

  numRecordsUpdated.release();

在消费者

  numRecordsUpdated.acquire();
于 2012-08-02T07:39:40.820 回答