27

我正在实现请求实例的 FIFO 队列(为速度而预先分配的请求对象),并开始使用 add 方法上的“同步”关键字。该方法很短(检查固定大小缓冲区中是否有空间,然后将值添加到数组)。使用visualVM,线程似乎比我喜欢的更频繁地阻塞(准确地说是“监视器”)。因此,我将代码转换为使用 AtomicInteger 值来跟踪当前大小,然后在 while 循环中使用 compareAndSet()(就像 AtomicInteger 在内部对诸如 incrementAndGet() 之类的方法所做的那样)。代码现在看起来更长了。

我想知道的是,使用同步和较短的代码与没有同步关键字的较长代码相比,性能开销是多少(所以永远不应该阻塞锁)。

这是带有 synchronized 关键字的旧 get 方法:

public synchronized Request get()
{
    if (head == tail)
    {
        return null;
    }
    Request r = requests[head];
    head = (head + 1) % requests.length;
    return r;
}

这是没有 synchronized 关键字的新 get 方法:

public Request get()
{
    while (true)
    {
        int current = size.get();
        if (current <= 0)
        {
            return null;
        }
        if (size.compareAndSet(current, current - 1))
        {
            break;
        }
    }

    while (true)
    {
        int current = head.get();
        int nextHead = (current + 1) % requests.length;
        if (head.compareAndSet(current, nextHead))
        {
            return requests[current];
        }
    }
}

我的猜测是 synchronized 关键字更糟,因为即使代码更短,也有阻塞锁的风险(可能导致线程上下文切换等)。

谢谢!

4

4 回答 4

34

我的猜测是 synchronized 关键字更糟,因为有阻塞锁的风险(可能导致线程上下文切换等)

是的,在一般情况下你是对的。Java Concurrency in Practice在第 15.3.2 节中讨论了这一点:

[...] 在高争用级别上,锁定的性能往往优于原子变量,但在更现实的争用级别上,原子变量的性能优于锁。这是因为锁通过挂起线程来对争用做出反应,从而减少 CPU 使用率和共享内存总线上的同步流量。(这类似于生产者-消费者设计中的阻塞生产者如何减少消费者的负载,从而让他们赶上。)另一方面,使用原子变量,争用管理被推回调用类。像大多数基于 CAS 的算法一样,AtomicPseudoRandom通过立即重试来对争用做出反应,这通常是正确的方法,但在高争用环境中只会产生更多争用。

在我们谴责AtomicPseudoRandom写得不好或原子变量与锁相比是一个糟糕的选择之前,我们应该意识到图 15.1 中的争用程度高得不切实际:没有真正的程序除了争夺锁或原子变量什么都不做。在实践中,原子倾向于比锁更好地扩展,因为原子更有效地处理典型的争用级别。

锁和原子在不同争用级别上的性能逆转说明了各自的优势和劣势。低到中等的争用,原子提供更好的可扩展性;对于高争用,锁提供更好的争用避免。(基于 CAS 的算法在单 CPU 系统上也优于基于锁的算法,因为 CAS 在单 CPU 系统上总是成功的,除非在读-修改-写操作中间线程被抢占的不太可能的情况。 )

(在文中提到的数字上,图 15.1 表明 AtomicInteger 和 ReentrantLock 在竞争高时的性能或多或少相等,而图 15.2 表明,在中等竞争下,前者的性能比后者高出 2-3 倍.)

更新:关于非阻塞算法

正如其他人所指出的,非阻塞算法虽然可能更快,但更复杂,因此更难正确处理。JCiA 第 15.4 节的提示:

良好的非阻塞算法以许多常见的数据结构而闻名,包括堆栈、队列、优先级队列和哈希表,尽管设计新的算法最好留给专家来完成。

非阻塞算法比基于锁的算法复杂得多。创建非阻塞算法的关键是弄清楚如何将原子更改的范围限制为单个变量,同时保持数据一致性。在诸如队列之类的链接集合类中,您有时可以将状态转换表示为对单个链接的更改并使用AtomicReference表示必须以原子方式更新的每个链接。

于 2010-08-24T12:19:40.843 回答
4

我想知道 jvm 在真正挂起线程之前是否已经做了一些旋转。它预计像您这样写得很好的关键部分非常简短并且几乎可以立即完成。因此,在放弃和挂起线程之前,它应该乐观地等待(我不知道)几十个循环。如果是这种情况,它的行为应该与您的第二个版本相同。

探查器显示的内容可能与全速在 jvm 中实际发生的情况非常不同,并带有各种疯狂的优化。最好在没有分析器的情况下测量和比较吞吐量。

于 2010-08-24T20:07:38.110 回答
1

在进行这种同步优化之前,您确实需要一个分析器来告诉您这是绝对必要的。

是的,在某些情况下同步可能比原子操作慢,但比较你的原始方法和替换方法。前者非常清晰且易于维护,后者肯定更复杂。因此,可能存在非常微妙的并发错误,在初始测试期间您不会发现这些错误。我已经看到了一个问题,size并且head确实可能​​不同步,因为尽管这些操作中的每一个都是原子的,但组合不是,有时这可能会导致状态不一致。

所以,我的建议:

  1. 开始简单
  2. 轮廓
  3. 如果性能足够好,保持简单的实现不变
  4. 如果您需要提高性能,那么开始变得聪明(可能首先使用更专业的锁),然后TESTTESTTEST
于 2010-08-24T12:38:41.943 回答
0

这是忙等待锁的代码。

public class BusyWaitLock
{
    private static final boolean LOCK_VALUE = true;
    private static final boolean UNLOCK_VALUE = false;
    private final static Logger log = LoggerFactory.getLogger(BusyWaitLock.class);

    /**
     * @author Rod Moten
     *
     */
    public class BusyWaitLockException extends RuntimeException
    {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        /**
         * @param message
         */
        public BusyWaitLockException(String message)
        {
            super(message);
        }



    }

    private AtomicBoolean lock = new AtomicBoolean(UNLOCK_VALUE);
    private final long maximumWaitTime ; 

    /**
     * Create a busy wait lock with that uses the default wait time of two minutes.
     */
    public BusyWaitLock()
    {
        this(1000 * 60 * 2); // default is two minutes)
    }

    /**
     * Create a busy wait lock with that uses the given value as the maximum wait time.
     * @param maximumWaitTime - a positive value that represents the maximum number of milliseconds that a thread will busy wait.
     */
    public BusyWaitLock(long maximumWaitTime)
    {
        if (maximumWaitTime < 1)
            throw new IllegalArgumentException (" Max wait time of " + maximumWaitTime + " is too low. It must be at least 1 millisecond.");
        this.maximumWaitTime = maximumWaitTime;
    }

    /**
     * 
     */
    public void lock ()
    {
        long startTime = System.currentTimeMillis();
        long lastLogTime = startTime;
        int logMessageCount = 0;
        while (lock.compareAndSet(UNLOCK_VALUE, LOCK_VALUE)) {
            long waitTime = System.currentTimeMillis() - startTime;
            if (waitTime - lastLogTime > 5000) {
                log.debug("Waiting for lock. Log message # {}", logMessageCount++);
                lastLogTime = waitTime;
            }
            if (waitTime > maximumWaitTime) {
                log.warn("Wait time of {} exceed maximum wait time of {}", waitTime, maximumWaitTime);
                throw new BusyWaitLockException ("Exceeded maximum wait time of " + maximumWaitTime + " ms.");
            }
        }
    }

    public void unlock ()
    {
        lock.set(UNLOCK_VALUE);
    }
}
于 2011-02-04T15:26:35.027 回答