36

我看到了BlockingQueue的这些实现,无法理解它们之间的区别。到目前为止我的结论:

  1. 我永远不需要SynchronousQueue
  2. LinkedBlockingQueue确保 FIFO,BlockingQueue必须使用参数 true 创建才能使其 FIFO
  3. SynchronousQueue破坏了大多数集合方法(包含、大小等)

那么我什么时候需要SynchronousQueue呢?此实现的性能是否优于LinkedBlockingQueue

为了使它更复杂......为什么Executors.newCachedThreadPool使用 SynchronousQueue 而其他人(Executors.newSingleThreadExecutorExecutors.newFixedThreadPool)使用 LinkedBlockingQueue?

编辑

第一个问题解决了。但是我仍然不明白为什么Executors.newCachedThreadPool使用 SynchronousQueue 而其他人(Executors.newSingleThreadExecutorExecutors.newFixedThreadPool)使用 LinkedBlockingQueue?

我得到的是,使用 SynchronousQueue,如果没有空闲线程,生产者将被阻塞。但是由于线程的数量实际上是无限的(如果需要,将创建新的线程),这永远不会发生。那么为什么要使用 SynchronousQueue 呢?

4

3 回答 3

56

SynchronousQueue是一种非常特殊的队列——它在Queue.

因此,您可能仅在需要特定语义的特殊情况下才需要它,例如,单线程任务而不排队进一步的请求

使用的另一个原因SynchronousQueue是性能。的实现SynchronousQueue似乎已经过高度优化,所以如果您只需要一个集合点(例如Executors.newCachedThreadPool(),消费者是“按需”创建的,因此队列项目不会累积),您可以通过使用SynchronousQueue.

简单的综合测试表明,在简单的单生产者 - 单消费者场景中,双核机器吞吐量SynchronousQueue约为 20 倍,LinkedBlockingQueue队列ArrayBlockingQueue长度 = 1。当队列长度增加时,它们的吞吐量上升并几乎达到SynchronousQueue. 这意味着SynchronousQueue与其他队列相比,它在多核机器上的同步开销较低。但同样,只有在您需要伪装成Queue.

编辑:

这是一个测试:

public class Test {
    static ExecutorService e = Executors.newFixedThreadPool(2);
    static int N = 1000000;

    public static void main(String[] args) throws Exception {    
        for (int i = 0; i < 10; i++) {
            int length = (i == 0) ? 1 : i * 5;
            System.out.print(length + "\t");
            System.out.print(doTest(new LinkedBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new ArrayBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new SynchronousQueue<Integer>(), N));
            System.out.println();
        }

        e.shutdown();
    }

    private static long doTest(final BlockingQueue<Integer> q, final int n) throws Exception {
        long t = System.nanoTime();

        e.submit(new Runnable() {
            public void run() {
                for (int i = 0; i < n; i++)
                    try { q.put(i); } catch (InterruptedException ex) {}
            }
        });    

        Long r = e.submit(new Callable<Long>() {
            public Long call() {
                long sum = 0;
                for (int i = 0; i < n; i++)
                    try { sum += q.take(); } catch (InterruptedException ex) {}
                return sum;
            }
        }).get();
        t = System.nanoTime() - t;

        return (long)(1000000000.0 * N / t); // Throughput, items/sec
    }
}    

这是我机器上的结果:

在此处输入图像描述

于 2011-02-24T09:23:40.213 回答
5

当前,默认值ExecutorsThreadPoolExecutor基于)可以使用一组预先创建的固定大小的线程和BlockingQueue某个大小的任意溢出线程,或者在(且仅当)该队列已满时创建最大大小的线程。

这导致了一些令人惊讶的特性。例如,由于只有在达到队列容量时才会创建额外的线程,所以使用 a LinkedBlockingQueue(无界)意味着永远不会创建新线程,即使当前池大小为零。如果您使用 an ,ArrayBlockingQueue则仅当它已满时才会创建新线程,并且如果到那时池尚未清除空间,则后续作业很有可能被拒绝。

A 的SynchronousQueue容量为零,因此生产者阻塞,直到消费者可用或创建线程。这意味着尽管@axtavt 产生了令人印象深刻的数据,但从生产者的角度来看,缓存线程池的性能通常最差。

不幸的是,目前还没有一个很好的折衷实现的库版本,它可以在突发或活动期间创建线程,从一个最小值到某个最大值。你要么有一个可增长的池,要么有一个固定的池。我们内部有一个,但还没有准备好供公众使用。

于 2011-02-24T21:18:17.273 回答
4

缓存线程池按需创建线程。它需要一个队列,要么将任务传递给等待的消费者,要么失败。如果没有等待的消费者,它会创建一个新线程。SynchronousQueue 不保存元素,而是传递元素或失败。

于 2011-02-24T09:21:40.307 回答