我正在尝试使用 java 实现水库采样算法。我有 N 个大小未知的数据流(来自到达接收器节点的传感器的读数)。为了简单起见,假设我有一个未知大小的流。
因此,其中一种水库采样算法建议创建一个大小为reservoirSize 的水库。假设它是 5。您获得的前五个读数,将它们存储在您的水库中。好的。现在,随着您获得越来越多的读数,每次读数都会生成一个从 0 到读数的随机数,如果该随机数小于水库大小,则将读数存储在水库 [randomNumber] 中。
所以可以说我有reservoirSize = 5,我刚刚得到了我的第10个读数。我将生成一个从 0 到 10 的随机数,如果该数字小于 5,我会将读数存储在随机数指向的位置。假设随机数是 3,所以我将读数 10 存储在水库 [3] 中。
public void sample (Vector pool, double Measurement, int streamIndex) {
if (streamIndex < ReservoirSize){
pool.addElement(Double.toString(Measurement));
}
else if ((randomIndex=(int)ranNum.nextInt((streamIndex+1)))<ReservoirSize) {
pool.setElementAt(Double.toString(Measurement), randomIndex);
}
}
这段代码的问题是,一旦 streamIndex 变得足够大(例如高于 4.000),我很少对任何读数进行采样。这确实是有道理的,因为生成从 0 到 4000 小于 5 的随机数的概率明显小于生成从 0 到 100 的随机数的概率,即小于 5。
我还从 Vitters 论文中实现了 AlgorthmR,并在此处描述了另一种方式:
Gregable ReservoirSampling
但所有实现都有同样的问题。流越大,采样频率就越小。因此,对于 0.5 秒的采样率,在我开始采样一小时后(这意味着已将大约 7000 个读数转发到汇节点),再过半小时即不会检测到测量量的变化,即读数表示更改将从水库中丢弃。
算法实现
public RSAlgorithmR() {
this.currentPool = null;
this.randomStoreatIndex = 0;
this.randomIndex = 0;
this.ranNum = new Random();
}
public void sample (LLNode cNode, double Measurement) {
int streamIndex = cNode.getStreamIndex();
int storeatIndex =cNode.getStoreatIndex();
if (streamIndex < ReservoirSize) {
cNode.data.addElement(Double.toString(Measurement));
if (streamIndex == ( ReservoirSize - 1) ) {
randomStoreatIndex = (int)ranNum.nextInt(ReservoirSize);
cNode.setStoreatIndex((int)randomStoreatIndex);
}
}
else {
if (storeatIndex == streamIndex) {
randomIndex=(int)ranNum.nextInt(ReservoirSize);
cNode.data.setElementAt(Double.toString(Measurement), randomIndex);
randomStoreatIndex = (int)ranNum.nextInt(streamIndex - ReservoirSize) + ReservoirSize;
cNode.setStoreatIndex(randomStoreatIndex);
System.out.println("Index:: "+streamIndex);
System.out.println("randomIndex:: " + randomIndex);
}
}
cNode.setStreamIndex();
};
Gregable 实现
public ReservoirSampler() {
this.currentPool = null;
this.randomIndex = 0;
this.ranProp = new Random();
this.ranInd = new Random();
}
public void sample (LLNode currentSpot, double humidityRead,
double temperatureRead, int streamIndex) {
double acceptancePropability = (double)ReservoirSize/streamIndex;
if (streamIndex < ReservoirSize){
currentSpot.humidityData.addElement(Double.toString(humidityRead));
currentSpot.temperatureData.addElement(Double.toString(temperatureRead));
}
else {
ranProp.setSeed(System.currentTimeMillis());
randomPropability=(double)ranProp.nextDouble();
if ( randomPropability < acceptancePropability){
ranInd.setSeed(System.currentTimeMillis());
randomIndex=(int)ranInd.nextInt((ReservoirSize));
currentSpot.humidityData.setElementAt(Double.toString(humidityRead),randomIndex);
currentSpot.temperatureData.setElementAt(Double.toString(temperatureRead),randomIndex);
}
}
}
这是算法的正常行为还是我在这里遗漏了什么?如果这是正常行为,有没有办法让它更“准确”地工作?