0

我试图从 yammer 指标中理解这段代码。混乱始于 trim 方法和在 update 和 getSnapShot 中对 trim 的调用。有人可以解释这里的逻辑,比如 15 分钟的滑动窗口吗?为什么要在将地图传递给 SnapShot 之前清除地图(这是计算窗口统计信息的地方)。

package com.codahale.metrics;

import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;


public class SlidingTimeWindowReservoir implements Reservoir {
    // allow for this many duplicate ticks before overwriting measurements
    private static final int COLLISION_BUFFER = 256;
    // only trim on updating once every N
    private static final int TRIM_THRESHOLD = 256;

    private final Clock clock;
    private final ConcurrentSkipListMap<Long, Long> measurements;
    private final long window;
    private final AtomicLong lastTick;
    private final AtomicLong count;


public SlidingTimeWindowReservoir(long window, TimeUnit windowUnit) {
    this(window, windowUnit, Clock.defaultClock());
}

public SlidingTimeWindowReservoir(long window, TimeUnit windowUnit, Clock clock) {
    this.clock = clock;
    this.measurements = new ConcurrentSkipListMap<Long, Long>();
    this.window = windowUnit.toNanos(window) * COLLISION_BUFFER;
    this.lastTick = new AtomicLong();
    this.count = new AtomicLong();
}

@Override
public int size() {
    trim();
    return measurements.size();
}

@Override
public void update(long value) {
    if (count.incrementAndGet() % TRIM_THRESHOLD == 0) {
        trim();
    }
    measurements.put(getTick(), value);
}

@Override
public Snapshot getSnapshot() {
    trim();
    return new Snapshot(measurements.values());
}

private long getTick() {
    for (; ; ) {
        final long oldTick = lastTick.get();
        final long tick = clock.getTick() * COLLISION_BUFFER;
        // ensure the tick is strictly incrementing even if there are duplicate ticks
        final long newTick = tick > oldTick ? tick : oldTick + 1;
        if (lastTick.compareAndSet(oldTick, newTick)) {
            return newTick;
        }
    }
}

private void trim() {
    measurements.headMap(getTick() - window).clear();
}
}
4

1 回答 1

1

文档中的两点信息

ConcurrentSkipListMap根据其键的自然顺序排序

这是保存所有测量的数据结构。这里的关键是long,基本上是当前时间。-> 按时间索引的测量按时间排序。

.headMap(K toKey)返回此地图部分的视图,其键严格小于toKey.

中的魔术代码getTick确保一个时间值永远不会被使用两次(oldTick + 1如果发生这种情况,只需使用)。COLLISION_BUFFER理解起来有点棘手,但它基本上确保即使Clock#getTick()返回相同的值,您也会获得不会与时钟的下一个刻度冲突的新值。

例如 Clock.getTick()返回 0 -> 修改为 0 * 256 = 0

Clock.getTick()返回 1 -> 修改为 1 * 256 = 256

-> 256 个值之间的空间。

现在trim()确实

measurements.headMap(getTick() - window).clear();

这会计算“当前时间”,减去时间窗口并使用该时间来获取比“窗口时间前”更早的地图部分。清除该部分也将在原始地图中清除它。它不是清除整个地图,只是清除那一部分。

-> trim 删除太旧的值。

每次update您需要删除旧值或地图变得太大时。当创建Snapshot相同的事情时,会发生这些旧值不包括在内。

无休止的 for 循环getTick是使用原子比较和设置方法的另一个技巧,以确保 - 一旦你准备好更新值 - 没有任何东西改变了两者之间的值。如果发生这种情况,整个循环将重新开始并刷新它的起始值。基本架构是

for (; ; ) {
    long expectedOldValue = atomic.get();
    // other threads can change the value of atomic here..

    long modified = modify(expectedOldValue);

    // we can only set the new value if the old one is still the same
    if (atomic.compareAndSet(expectedOldValue, modified)) {
        return modified;
    }
}
于 2013-08-13T13:28:36.043 回答