我正在尝试聚合由其分钟时间戳键入的 60 秒数据,最大延迟为 30 秒。
DataStream<OHLChelp> ohlcAggStream = stockStream.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(30))).map(new mapStockToOhlcHelp()).keyBy((KeySelector<OHLChelp, Long>) o -> o.getMinTime())
.timeWindow(Time.seconds(60))
.reduce(new aggregateOHLC());
//map complex object to simpler one
DataStream<OHLCmodel> ohlcStremAggregated = ohlcAggStream.map(new mapOHLCredToOHLCfin());
//log ohlc stream
ohlcStreamAggregated.writeAsText(outLogPath);
我正在接收数据。水印和时间戳正在设置中。t 似乎,聚合数据永远不会发送到 ohlcStreamAggregated,因此它们不会被记录。
public TimestampExtractor(Time maxDelayInterval) {
if (maxDelayInterval.toMilliseconds() < 0) {
throw new RuntimeException("This parameter must be positive or 0.);
}
this.maxDelayInterval = maxDelayInterval.toMilliseconds() / 1000;
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxDelayInterval;
}
@Override
public final Watermark getCurrentWatermark() {
// set maximum delay 30 seconds
long potentialWM = currentMaxTimestamp - maxDelayInterval;
if (potentialWM > lastEmittedWM) {
lastEmittedWM = potentialWM;
}
return new Watermark(lastEmittedWM);
}
@Override
public final long extractTimestamp(StockTrade stockTrade, long previousElementTimestamp) {
BigDecimal bd = new BigDecimal(stockTrade.getTime());
long timestamp = bd.longValue();
//set the maximum seen timestamp so far
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
我用这个例子作为模板。