我正在运行一个管道,从Kafka
主题源和接收器到IMap
. 每次我写一个,我都会遇到这些方法withIngestionTimestamps()
,withoutTimestamps()
想知道它们有什么用?我了解它的所有关于为事件添加时间的来源。问题是我如何使用它?我没有看到任何从事件中获取时间戳的方法?
我的 IMap 有可能被重复值填充。如果我可以使用 withIngestionTimestamps() 方法来评估最新记录并丢弃旧记录?
我正在运行一个管道,从Kafka
主题源和接收器到IMap
. 每次我写一个,我都会遇到这些方法withIngestionTimestamps()
,withoutTimestamps()
想知道它们有什么用?我了解它的所有关于为事件添加时间的来源。问题是我如何使用它?我没有看到任何从事件中获取时间戳的方法?
我的 IMap 有可能被重复值填充。如果我可以使用 withIngestionTimestamps() 方法来评估最新记录并丢弃旧记录?
Jet 使用事件时间戳来正确应用窗口。它必须决定哪个事件属于哪个窗口以及何时关闭窗口并发出其聚合结果。时间戳作为元数据存在于事件中,不会向用户公开。
但是,如果您想应用引用挂钟时间的逻辑,您可以随时调用System.currentTimeMillis()
以根据显式存储在 IMap 值中的时间戳检查它。这相当于使用处理时间,这与Jet 应用的摄取时间非常相似。摄取时间只是在管道的源顶点处有效的处理时间,因此在接收器顶点处应用处理时间与此略有不同,并且具有相同的实际属性。
Jet 在幕后管理事件时间戳,它只对处理器可见。例如,窗口聚合将使用时间戳。
如果您想在代码中查看时间戳,则必须将其包含在您的项目类型中。您必须不使用源中的时间戳,使用map
运算符添加摄取时间戳并让 Jet 知道它:
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(...))
.withoutTimestamps()
.map(t -> tuple2(System.currentTimeMillis(), t))
.addTimestamps(Tuple2::f0, 2000)
.drainTo(Sinks.logger());
我用allowedLag
了2000毫秒。这样做的原因是时间戳将添加到分配它们的顶点下游的顶点中。流合并可以在那里发生,并且需要考虑内部偏差。例如,它应该考虑最长的预期 GC 暂停或网络延迟。请参阅方法中的注释addTimestamps
。