我需要测量软件系统从消息队列中消费消息的速率并定期报告。
具体来说,消息来自消息队列系统,我需要(每秒)报告在多个滚动窗口内收到的消息数量 - 例如最后一秒、最后 5 秒、最后 30 秒等。
虽然我确信我可以构建它,但我不确定我是否会以最有效的方式去做!我也确信有一些库可以做到这一点(我使用的是 JVM,所以我想到了 Apache Commons Math),但我什至不知道谷歌的正确用词!:-)
我需要测量软件系统从消息队列中消费消息的速率并定期报告。
具体来说,消息来自消息队列系统,我需要(每秒)报告在多个滚动窗口内收到的消息数量 - 例如最后一秒、最后 5 秒、最后 30 秒等。
虽然我确信我可以构建它,但我不确定我是否会以最有效的方式去做!我也确信有一些库可以做到这一点(我使用的是 JVM,所以我想到了 Apache Commons Math),但我什至不知道谷歌的正确用词!:-)
这是我基于指数平滑的解决方案。它不需要任何后台线程。您将为要跟踪的每个滚动窗口创建 1 个实例。对于每个相关事件,您将在每个实例上调用 newEvent。
public class WindowedEventRate {
private double normalizedRate; // event rate / window
private long windowSizeTicks;
private long lastEventTicks;
public WindowedEventRate(int aWindowSizeSeconds) {
windowSizeTicks = aWindowSizeSeconds * 1000L;
lastEventTicks = System.currentTimeMillis();
}
public double newEvent() {
long currentTicks = System.currentTimeMillis();
long period = currentTicks - lastEventTicks;
lastEventTicks = currentTicks;
double normalizedFrequency = (double) windowSizeTicks / (double) period;
double alpha = Math.min(1.0 / normalizedFrequency, 1.0);
normalizedRate = (alpha * normalizedFrequency) + ((1.0 - alpha) * normalizedRate);
return getRate();
}
public double getRate() {
return normalizedRate * 1000L / windowSizeTicks;
}
}
这就是我最终写的。
package com.example;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BucketCounter {
private final Lock rollLock = new ReentrantLock();
private final int[] bucketSizes;
private final int[] buckets;
private final int[] intervals;
private final AtomicInteger incoming = new AtomicInteger(0);
public BucketCounter(int... bucketSizes) {
if (bucketSizes.length < 1) {
throw new IllegalArgumentException("Must specify at least one bucket size");
}
this.bucketSizes = bucketSizes;
this.buckets = new int[bucketSizes.length];
Arrays.sort(bucketSizes);
if (bucketSizes[0] < 1) {
throw new IllegalArgumentException("Cannot have a bucket of size < 1");
}
intervals = new int[bucketSizes[bucketSizes.length - 1]];
}
public int count(int n) {
return incoming.addAndGet(n);
}
public int[] roll() {
final int toAdd = incoming.getAndSet(0);
rollLock.lock();
try {
final int[] results = new int[buckets.length];
for (int i = 0, n = buckets.length; i < n; i++) {
results[i] = buckets[i] = buckets[i] - intervals[bucketSizes[i] - 1] + toAdd;
}
System.arraycopy(intervals, 0, intervals, 1, intervals.length - 1);
intervals[0] = toAdd;
return results;
} finally {
rollLock.unlock();
}
}
}
通过传递不同的时间增量(例如 1、5、30)对其进行初始化。然后安排一个后台线程在roll()
每个“时间段”调用。如果您每秒调用一次,那么您的存储桶为 1、5 和 30 秒。如果您每 5 秒调用一次,那么您的存储桶是 5、25 和 150 秒等。基本上,存储桶用“roll()
调用次数”表示)。
roll()
还会为您返回每个存储桶的当前计数数组。请注意,这些数字是原始计数,并不是每个时间间隔的平均值。如果您想衡量“比率”而不是“计数”,则需要自己进行划分。
最后,每次发生事件时,调用count()
. 我已经建立了一个包含其中一些的系统,我调用count(1)
每条消息来计算传入消息,count(message.size())
在每条消息上计算传入字节率等。
希望有帮助。
您可能可以将其实现为拦截器,因此搜索拦截器并结合消息队列产品名称和语言名称。