2

我需要测量软件系统从消息队列中消费消息的速率并定期报告。

具体来说,消息来自消息队列系统,我需要(每秒)报告在多个滚动窗口内收到的消息数量 - 例如最后一秒、最后 5 秒、最后 30 秒等。

虽然我确信我可以构建它,但我不确定我是否会以最有效的方式去做!我也确信有一些库可以做到这一点(我使用的是 JVM,所以我想到了 Apache Commons Math),但我什至不知道谷歌的正确用词!:-)

4

3 回答 3

5

这是我基于指数平滑的解决方案。它不需要任何后台线程。您将为要跟踪的每个滚动窗口创建 1 个实例。对于每个相关事件,您将在每个实例上调用 newEvent。

public class WindowedEventRate {

  private double normalizedRate; // event rate / window
  private long windowSizeTicks;
  private long lastEventTicks;


  public WindowedEventRate(int aWindowSizeSeconds) {
    windowSizeTicks = aWindowSizeSeconds * 1000L;
    lastEventTicks = System.currentTimeMillis();
  }

  public double newEvent() {

    long currentTicks = System.currentTimeMillis();
    long period = currentTicks - lastEventTicks;
    lastEventTicks = currentTicks;
    double normalizedFrequency = (double) windowSizeTicks / (double) period;

    double alpha = Math.min(1.0 / normalizedFrequency, 1.0);
    normalizedRate = (alpha * normalizedFrequency) + ((1.0 - alpha) * normalizedRate);
    return getRate();
  }

  public double getRate() {
    return normalizedRate * 1000L / windowSizeTicks;
  }
}
于 2013-05-16T21:56:09.733 回答
2

这就是我最终写的。

package com.example;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BucketCounter {
    private final Lock rollLock = new ReentrantLock();
    private final int[] bucketSizes;
    private final int[] buckets;
    private final int[] intervals;
    private final AtomicInteger incoming = new AtomicInteger(0);

    public BucketCounter(int... bucketSizes) {
        if (bucketSizes.length < 1) {
            throw new IllegalArgumentException("Must specify at least one bucket size");
        }

        this.bucketSizes = bucketSizes;
        this.buckets = new int[bucketSizes.length];

        Arrays.sort(bucketSizes);

        if (bucketSizes[0] < 1) {
            throw new IllegalArgumentException("Cannot have a bucket of size < 1");
        }

        intervals = new int[bucketSizes[bucketSizes.length - 1]];
    }

    public int count(int n) {
        return incoming.addAndGet(n);
    }

    public int[] roll() {
        final int toAdd = incoming.getAndSet(0);

        rollLock.lock();
        try {
            final int[] results = new int[buckets.length];

            for (int i = 0, n = buckets.length; i < n; i++) {
                results[i] = buckets[i] = buckets[i] - intervals[bucketSizes[i] - 1] + toAdd;
            }

            System.arraycopy(intervals, 0, intervals, 1, intervals.length - 1);
            intervals[0] = toAdd;

            return results;
        } finally {
            rollLock.unlock();
        }
    }
}

通过传递不同的时间增量(例如 1、5、30)对其进行初始化。然后安排一个后台线程在roll()每个“时间段”调用。如果您每秒调用一次,那么您的存储桶为 1、5 和 30 秒。如果您每 5 秒调用一次,那么您的存储桶是 5、25 和 150 秒等。基本上,存储桶用“roll()调用次数”表示)。

roll()还会为您返回每个存储桶的当前计数数组。请注意,这些数字是原始计数,并不是每个时间间隔的平均值。如果您想衡量“比率”而不是“计数”,则需要自己进行划分。

最后,每次发生事件时,调用count(). 我已经建立了一个包含其中一些的系统,我调用count(1)每条消息来计算传入消息,count(message.size())在每条消息上计算传入字节率等。

希望有帮助。

于 2013-05-16T16:54:57.817 回答
-1

您可能可以将其实现为拦截器,因此搜索拦截器并结合消息队列产品名称和语言名称。

于 2013-05-08T11:56:20.210 回答