我正在尝试构建一个实时管道来处理来自 Kinesis 流的 JSON 事件并将它们聚合到固定窗口上 - 例如。每 5 分钟计算一次平均值。记录如下所示:
{
"value": 15.01,
"timestamp": "16 Feb 2022 20:58:05 GMT"
}
我在 Flink Runner 上使用 Apache Beam 并创建了一个管道,该管道将根据事件时间处理事件,如下所示:
pipeline
.apply(
"KDS source",
KinesisIO.read()
.withStreamName(options.getInputStreamName())
.withAWSClientsProvider(new DefaultAWSClientsProvider(options))
.withRequestRecordsLimit(1000)
.withCustomWatermarkPolicy(
WatermarkPolicyFactory.withCustomWatermarkPolicy(
WatermarkParameters.create().withTimestampFn(record -> {
Optional<Record> maybeRecord = Utils.maybeGetRecordFromKinesisByteArray(record.getDataAsBytes());
return maybeRecord.map(Util::jodaInstantEventTimeFromRecord).orElse(null);
}))
.withInitialPositionInStream(InitialPositionInStream.LATEST))
.apply("Kinesis Record To Record", ParDo.of(new KinesisRecordToRecordTransform()))
.apply("Timestamps", WithTimestamps.of(Utils::jodaInstantEventTimeFromRecord));
管道的最后一步应该是一个窗口函数,但为简单起见将其省略。代码原样引发异常:
Cannot output with timestamp 2022-02-16T14:47:25.688Z. Output timestamps must be no earlier than the timestamp of the current input (2022-02-16T14:47:25.725Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
通过查看 KinesisIO 代码,没有设置时间戳功能的功能,但记录的进入时间用作时间戳。与 KafkaIO 或 AvroIo 相比,您可以覆盖从记录中提取事件时间的方式,而 KinesisIo 则没有这样的事情。
有谁知道这个问题的任何解决方法?
谢谢