9

在这些程序中,我应该选择哪一个而不是另一个,为什么?一般来说,问题是为什么我应该选择使用 PriorityBlockingQueue 而不是 PriorityQueue。

优先阻塞队列

import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueExample {

    static PriorityBlockingQueue<String> priorityQueue = new PriorityBlockingQueue<String>();
    public static void main(String[] args) {

        new Thread(){
            public void run(){
                try {
                System.out.println(priorityQueue.take() +" is removed from priorityQueue object");
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }    
            }
        }.start();
        new Thread(){
            public void run(){
                priorityQueue.add("string variable");
                System.out.println("Added an element to the queue");
            }
        }.start();
    }
}

在这些程序中,我应该选择哪一个而不是另一个,为什么?一般来说,问题是为什么我应该选择使用 PriorityBlockingQueue 而不是 PriorityQueue。优先队列

import java.util.PriorityQueue;

public class PriorityQueueTest {

    static PriorityQueue<String> priorityQueue = new PriorityQueue<String>();
    private static Object lock = new Object();
    public static void main(String[] args) {

        new Thread(){
            public void run(){
                synchronized(lock){     
                    try {
                        while(priorityQueue.isEmpty()){lock.wait();}
                            System.out.println(priorityQueue.remove() +" is removed from priorityQueue object");
                            lock.notify();
                    } catch (InterruptedException e) {
                            //  TODO Auto-generated catch block
                            e.printStackTrace();
                    }
                }
            }
        }.start();
        new Thread(){
            public void run(){
                synchronized(lock){
                    priorityQueue.add("string variable");
                    System.out.println("Added an element to the queue");
                    lock.notify();
                }
            }
        }.start();
    }
}
4

3 回答 3

19

如果队列为空,则正常访问时Queue将返回,而如果队列为空,则阻塞,直到有值可用。nullBlockingQueue

您正在使用的队列中的优先级部分仅表示以特定顺序从队列中读取项目(如果它们实现则自然Comparable或根据 a Comparator)。

通常,您应该依赖于抽象类型,要么PriorityQueue要么BlockingQueue。如果您的代码需要了解这两个概念,则可能需要重新考虑。

PriorityQueue您可能需要归结为消息排序的原因有很多。例如,在作业队列上,您可能希望能够赋予这些作业优先级。也就是说,通常处理作业的代码应该与订单无关。

通常,BlockingQueue您处于工作线程的领域,正在处理排队的工作,当没有工作要做时,这些线程可以被阻塞,直到工作变得可用。就像 a 的示例一样PriorityQueue,调用代码可能对此不可知,尽管您可能希望使用并非总是如此的某种等待超时。

于 2013-05-22T13:50:54.293 回答
9

PriorityBlockingQueue 与 JDK 5 中的并发包一起添加,请参见:http ://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/package-summary.html

它基本上是在做你为 PriorityQueue 编写的额外代码,在你的队列周围添加通常需要的同步/等待/通知。因此,名称的“阻塞”部分被添加以暗示线程将阻塞等待,直到队列中有可用的项目。

如果您的应用程序可以在 JDK 5 或更高版本上运行,我会使用 PriorityBlockingQueue。

于 2013-05-22T13:51:28.103 回答
2

我知道这是一个老话题,但我看到您没有考虑优先级队列的并发实现。

尽管 java 的集合框架没有,但它确实有足够的构建块来创建一个:

public class ConcurrentSkipListPriorityQueue<T> implements Queue<T> {

    private ConcurrentSkipListMap<T, Boolean> values;

    public ConcurrentSkipListPriorityQueue(Comparator<? super T> comparator) {
        values = new ConcurrentSkipListMap<>(comparator);
    }

    public ConcurrentSkipListPriorityQueue() {
        values = new ConcurrentSkipListMap<>();
    }

    @Override
    public boolean add(T e) {
        values.put(e, Boolean.TRUE);
        return true;
    }

    @Override
    public boolean offer(T e) {
        return add(e);
    }

    @Override
    public T remove() {
        while (true) {
            final T v = values.firstKey();
            if (values.remove(v)) {
                return v;
            }
        }
    }

    @Override
    public T poll() {

        try {
            while (true) {
                if (values.isEmpty()) {
                    return null;
                }
                final T v = values.firstKey();
                if (values.remove(v)) {
                    return v;
                }
            }
        } catch (NoSuchElementException ex) {
            return null; // poll should not throw an exception.. 
        }
    }

    @Override
    public T element() {
        return values.firstKey();
    }

    @Override
    public T peek() {
        if (values.isEmpty()) {
            return null;
        }

        try {
            return element();
        } catch (NoSuchElementException ex) {
            return null;
        }
    }

    @Override
    public int size() {
        return values.size();
    }

    @Override
    public boolean isEmpty() {
        return values.isEmpty();
    }

    @Override
    public boolean contains(Object o) {
        return values.containsKey(o);
    }

    @Override
    public Iterator<T> iterator() {
        return values.keySet().iterator();
    }

    @Override
    public Object[] toArray() {
        return values.keySet().toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return values.keySet().toArray(a);
    }

    @Override
    public boolean remove(Object o) {
        return values.remove(o);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return values.keySet().containsAll(c);
    }

    @Override
    public boolean addAll(Collection<? extends T> c) {

        boolean changed = false;

        for (T i : c) {
            changed |= add(i);
        }

        return changed;
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return values.keySet().removeAll(c);
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        return values.keySet().retainAll(c);
    }

    @Override
    public void clear() {
        values.clear();
    }

}

此队列基于跳过列表,将其所有操作委托给 ConcurrentSkipListMap 类。它允许来自多个线程的非阻塞并发访问。

于 2015-06-11T13:50:16.857 回答