25

我需要为 Web 服务器实现全局对象收集统计信息。我有Statistics 单例,它有方法addSample(long sample),随后调用updateMax. 这显然必须是线程安全的。我有这种方法可以更新整个统计信息的最大值:

AtomicLong max;

private void updateMax(long sample) {
    while (true) {
        long curMax = max.get();
        if (curMax < sample) {
            boolean result = max.compareAndSet(curMax, sample);
            if (result) break;
        } else {
            break;
        }
    }
}

这个实现正确吗?我正在使用 java.util.concurrent,因为我相信它会比 simple 更快synchronized。有没有其他/更好的方法来实现这个?

4

5 回答 5

18

从 Java 8 开始,LongAccumulator被引入。建议为

当多个线程更新用于收集统计信息等目的的公共值时,此类通常比 AtomicLong 更可取,而不是用于细粒度的同步控制。在低更新争用下,这两个类具有相似的特征。但是在高竞争下,这个类的预期吞吐量明显更高,代价是更高的空间消耗。

您可以按如下方式使用它:

LongAccumulator maxId = new LongAccumulator(Long::max, 0); //replace 0 with desired initial value
maxId.accumulate(newValue); //from each thread
于 2018-02-28T10:39:09.213 回答
13

我认为这是正确的,但为了清楚起见,我可能会稍微重写一下,并且肯定会添加评论:

private void updateMax(long sample) {
    while (true) {
        long curMax = max.get();
        if (curMax >= sample) {
            // Current max is higher, so whatever other threads are
            // doing, our current sample can't change max.
            break;
        }

        // Try updating the max value, but only if it's equal to the
        // one we've just seen. We don't want to overwrite a potentially
        // higher value which has been set since our "get" call.
        boolean setSuccessful = max.compareAndSet(curMax, sample);

        if (setSuccessful) {
            // We managed to update the max value; no other threads
            // got in there first. We're definitely done.
            break;
        }

        // Another thread updated the max value between our get and
        // compareAndSet calls. Our sample can still be higher than the
        // new value though - go round and try again.
    }
}

编辑:通常我至少会先尝试同步版本,并且只有在我发现它导致问题时才使用这种无锁代码。

于 2011-05-20T12:53:45.573 回答
7

使用 Java 8,您可以利用函数式接口和一个简单的 lamda 表达式来解决这个问题,只需一行,无需循环:

private void updateMax(long sample) {
    max.updateAndGet(curMax -> (sample > curMax) ? sample : curMax);
}

解决方案使用updateAndGet(LongUnaryOperator)方法。当前值包含在条件运算符中curMax并使用条件运算符执行简单测试,如果样本值大于当前最大值,则将当前最大值替换为样本值。

于 2017-04-04T14:43:05.313 回答
3

好像您没有选择答案,这是我的:

// while the update appears bigger than the atomic, try to update the atomic.
private void max(AtomicDouble atomicDouble, double update) {
    double expect = atomicDouble.get();
    while (update > expect) {
        atomicDouble.weakCompareAndSet(expect, update);
        expect = atomicDouble.get();
    }
}

它或多或少与接受的答案相同,但不使用breakwhile(true)我个人不喜欢。

编辑:刚刚DoubleAccumulator在 java 8 中发现。文档甚至说这是针对像您这样的汇总统计问题:

DoubleAccumulator max = new DoubleAccumulator(Double::max, Double.NEGATIVE_INFINITY);
parallelStream.forEach(max::accumulate);
max.get();
于 2016-09-03T09:48:22.503 回答
2

我相信您所做的是正确的,但这是一个更简单的版本,我也认为是正确的。

private void updateMax(long sample){
      //this takes care of the case where between the comparison and update steps, another thread updates the max

      //For example:
      //if the max value is set to a higher max value than the current value in between the comparison and update step
      //sample will be the higher value from the other thread
      //this means that the sample will now be higher than the current highest (as we just set it to the value passed into this function)
      //on the next iteration of the while loop, we will update max to match the true max value
      //we will then fail the while loop check, and be done with trying to update.
      while(sample > max.get()){
          sample = max.getAndSet(sample);  
      }
}
于 2012-01-13T20:33:21.230 回答