我有一个为 Flink CEP 生成事件的生成器,代码如下。基本上,我正在使用Thread.sleep()
并且我在某处读到即使我们使用 java 的睡眠时间也不能少于 1 毫秒System.nanoTime()
。生成器的代码是
public class RR_interval_Gen extends RichParallelSourceFunction<RRIntervalStreamEvent> {
Integer InputRate ; // events/second
Integer Sleeptime ;
Integer NumberOfEvents;
public RR_interval_Gen(Integer inputRate, Integer numberOfEvents ) {
this.InputRate = inputRate;
Sleeptime = 1000 / InputRate;
NumberOfEvents = numberOfEvents;
}
@Override
public void run(SourceContext<RRIntervalStreamEvent> sourceContext) throws Exception {
long currentTime;
Random random = new Random();
int RRInterval;
int Sensor_id;
for(int i = 1 ; i <= NumberOfEvents ; i++) {
Sensor_id = 2;
currentTime = System.currentTimeMillis();
// int randomNum = rand.nextInt((max - min) + 1) + min;
RRInterval = 10 + random.nextInt((20-10)+ 1);
RRIntervalStreamEvent stream = new RRIntervalStreamEvent(Sensor_id,currentTime,RRInterval);
synchronized (sourceContext.getCheckpointLock())
{
sourceContext.collect(stream);
}
Thread.sleep(Sleeptime);
}
}
@Override
public void cancel() {
}
}
我将在这里用简单的话来说明我的要求。我希望生成器类生成事件,比如说 1200 Hz 的 ECG 流。该生成器将接受我们必须生成流的输入速率和总时间等参数。
到目前为止一切顺利,问题是我需要每秒发送超过 1000 个事件。如何通过使用生成值的生成器函数来做到这一点U[10,20]
?
另外,如果我在上面使用错误的方式生成 x 个事件/秒,请告诉我。
Sleeptime = 1000 / InputRate;
提前致谢