我正在尝试使用 hazelcast jet 对文件中的连续数字流求和
pipe
.drawFrom(Sources.fileWatcher)<dir>))
.map(s->Integer.parseInt(s))
.addTimestamps()
.window(WindowDefinition.sliding(10000,1000))
.aggregate(AggregateOperations.summingDouble(x->x))
.drainTo(Sinks.logger());
几个问题
- 它没有给出预期的输出,我的期望是文件中出现新数字时,它应该将其添加到现有总和中
- 为此,为什么我需要提供窗口和
addTimestamp
方法,我只需要做无限流的总和 - 我们如何实现容错,即如果服务器重新启动,它将保存聚合结果,当它出现时,它将从最后计算的总和中聚合?
- 如果服务器关闭并且当服务器启动时现在很少有数字进入文件,它会从服务器关闭时的最后一点读取还是会在服务器关闭时错过数字并且只会读取它在之后获得的数字服务器已启动。