0

我正在尝试使用 hazelcast jet 对文件中的连续数字流求和

pipe
    .drawFrom(Sources.fileWatcher)<dir>))
    .map(s->Integer.parseInt(s))
    .addTimestamps()
    .window(WindowDefinition.sliding(10000,1000))
    .aggregate(AggregateOperations.summingDouble(x->x))
    .drainTo(Sinks.logger());

几个问题

  1. 它没有给出预期的输出,我的期望是文件中出现新数字时,它应该将其添加到现有总和中
  2. 为此,为什么我需要提供窗口和addTimestamp方法,我只需要做无限流的总和
  3. 我们如何实现容错,即如果服务器重新启动,它将保存聚合结果,当它出现时,它将从最后计算的总和中聚合?
  4. 如果服务器关闭并且当服务器启动时现在很少有数字进入文件,它会从服务器关闭时的最后一点读取还是会在服务器关闭时错过数字并且只会读取它在之后获得的数字服务器已启动。
4

1 回答 1

0

对 Q1 和 Q2 的回答:您正在寻找rollingAggregate,您不需要时间戳或窗口。

pipe
    .drawFrom(Sources.fileWatcher(<dir>))
    .rollingAggregate(AggregateOperations.summingDouble(Double::parseDouble))
    .drainTo(Sinks.logger());

对 Q3 和 Q4 的回答:fileWatcher源不是容错的。原因是它读取本地文件,当成员死亡时,本地文件无论如何都将不可用。当作业重新启动时,它将从当前位置开始读取,并且会错过作业停止时添加的数字。

此外,由于您使用全局聚合,所有文件中的数据将被路由到单个集群成员,其他成员将处于空闲状态。

于 2019-01-22T16:10:53.933 回答