18

BlockingQueue的文档说批量操作不是线程安全的,尽管它没有明确提到方法 drainTo()。

BlockingQueue 实现是线程安全的。所有排队方法都使用内部锁或其他形式的并发控制以原子方式实现其效果。但是,批量收集操作 addAll、containAll、retainAll 和 removeAll 不一定以原子方式执行,除非在实现中另外指定。因此,例如,addAll(c) 在仅添加 c 中的一些元素后可能会失败(抛出异常)。

drainTo() 方法的文档指定不能以线程安全的方式修改 BlockingQueue 的元素所排放到的集合。但是,它没有提到任何关于 drainTo() 操作是线程安全的。

从此队列中删除所有可用元素并将它们添加到给定集合中。此操作可能比重复轮询此队列更有效。尝试将元素添加到集合 c 时遇到的失败可能会导致在引发相关异常时元素既不在集合中,又不属于任何一个集合或两个集合。尝试将队列排空到自身会导致 IllegalArgumentException。此外,如果在操作正在进行时修改了指定的集合,则此操作的行为是未定义的。

那么,drainTo() 方法是线程安全的吗?换句话说,如果一个线程在阻塞队列上调用了 drainTo() 方法,而另一个线程在同一个队列上调用了 add() 或 put(),那么在这两个操作结束时队列的状态是否一致?

4

3 回答 3

18

我认为您混淆了“线程安全”和“原子”这两个术语。它们的意思不同。方法可以是线程安全的而不是原子的,也可以是原子的(对于单个线程)而不是线程安全的。

线程安全是一个橡胶术语,如果没有循环就很难定义。根据 Goetz 的说法,一个好的工作模型是一个方法是线程安全的,如果它在多线程上下文中使用时与在单线程上下文中运行时一样“正确”。弹性在于正确性是主观的,除非你有一个正式的规范来衡量。

相比之下,原子很容易定义。它只是意味着操作要么完全发生,要么根本不发生。

所以你的问题的答案drainTo()是线程安全的,但不是原子的。它不是原子的,因为它可能会在耗尽过程中抛出异常。但是,无论其他线程是否同时对队列执行操作,队列仍将处于一致状态。


(在上面的讨论中隐含了接口的具体实现BlockingQueue正确地实现了接口。如果没有,所有的赌注都没有了。)

于 2011-07-07T07:35:28.050 回答
6

drainTo()从某种意义上说是线程安全的,即同时发生的队列上的任何操作都不会改变结果,也不会破坏队列的状态。否则,该方法将毫无意义。

如果目标集合(添加结果的集合)做了一些“聪明”的事情,您可能会遇到问题。但是,由于您通常将队列排空到只有单个线程可以访问的集合,所以这更像是一个理论问题。

于 2011-07-07T08:10:13.410 回答
0

偶然发现了这个问题,并想添加一个实现信息。

来自PriorityBlockingQueue 的Java 8 源代码:

 /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = Math.min(size, maxElements);
            for (int i = 0; i < n; i++) {
                c.add((E) queue[0]); // In this order, in case add() throws.
                dequeue();
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

可以看到 aReentrantLock是用来锁定临界区的。方法poll()offer()使用的锁也是一样的。所以BlockingQueue在这种情况下 PriorityBlockingQueue 的实现确实是Blocking

于 2020-11-25T16:51:17.283 回答