我有一些代码,我使用执行器和阻塞队列执行多个任务。结果必须作为迭代器返回,因为这是我工作的应用程序所期望的。但是,任务和添加到队列的结果之间存在 1:N 的关系,因此我不能使用ExecutorCompletionService。在调用 hasNext() 时,我需要知道所有任务何时完成并将所有结果添加到队列中,以便我可以停止从队列中检索结果。请注意,一旦项目被放入队列,另一个线程应该准备好使用(Executor.invokeAll(), 阻塞直到所有任务完成,这不是我想要的,也不是超时)。这是我的第一次尝试,我使用 AtomicInteger 只是为了证明这一点,即使它不起作用。有人可以帮助我了解如何解决这个问题吗?
public class ResultExecutor<T> implements Iterable<T> {
    private BlockingQueue<T> queue;
    private Executor executor;
    private AtomicInteger count;
    public ResultExecutor(Executor executor) {
        this.queue = new LinkedBlockingQueue<T>();
        this.executor = executor;
        count = new AtomicInteger();            
    }
    public void execute(ExecutorTask task) {
        executor.execute(task);
    }
    public Iterator<T> iterator() {
        return new MyIterator();
    }
    public class MyIterator implements Iterator<T> {
        private T current;          
        public boolean hasNext() {
            if (count.get() > 0 && current == null)
            {
                try {
                    current = queue.take();
                    count.decrementAndGet();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return current != null;
        }
        public T next() {
            final T ret = current;
            current = null;
            return ret;
        }
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
    public class ExecutorTask implements Runnable{
        private String name;
        public ExecutorTask(String name) {
            this.name = name;
        }
         private int random(int n)
         {
           return (int) Math.round(n * Math.random());
         }
        @SuppressWarnings("unchecked")
        public void run() {
            try {
                int random = random(500);
                Thread.sleep(random);
                queue.put((T) (name + ":" + random + ":1"));
                queue.put((T) (name + ":" + random + ":2"));
                queue.put((T) (name + ":" + random + ":3"));
                queue.put((T) (name + ":" + random + ":4"));
                queue.put((T) (name + ":" + random + ":5"));
                count.addAndGet(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }                   
        }           
    }       
}
调用代码如下所示:
    Executor e = Executors.newFixedThreadPool(2);
    ResultExecutor<Result> resultExecutor = new ResultExecutor<Result>(e);
    resultExecutor.execute(resultExecutor.new ExecutorTask("A"));
    resultExecutor.execute(resultExecutor.new ExecutorTask("B"));
    Iterator<Result> iter = resultExecutor.iterator();
    while (iter.hasNext()) {
        System.out.println(iter.next());
    }