0

我正在尝试构建一个实时管道来处理来自 Kinesis 流的 JSON 事件并将它们聚合到固定窗口上 - 例如。每 5 分钟计算一次平均值。记录如下所示:

{
   "value": 15.01,
   "timestamp": "16 Feb 2022 20:58:05 GMT"
}

我在 Flink Runner 上使用 Apache Beam 并创建了一个管道,该管道将根据事件时间处理事件,如下所示:

pipeline
        .apply(
            "KDS source",
            KinesisIO.read()
                .withStreamName(options.getInputStreamName())
                .withAWSClientsProvider(new DefaultAWSClientsProvider(options))
                .withRequestRecordsLimit(1000)
                .withCustomWatermarkPolicy(
                    WatermarkPolicyFactory.withCustomWatermarkPolicy(
                            WatermarkParameters.create().withTimestampFn(record -> {
                                Optional<Record> maybeRecord = Utils.maybeGetRecordFromKinesisByteArray(record.getDataAsBytes());
                                return maybeRecord.map(Util::jodaInstantEventTimeFromRecord).orElse(null);
                            }))
                .withInitialPositionInStream(InitialPositionInStream.LATEST))

        .apply("Kinesis Record To Record", ParDo.of(new KinesisRecordToRecordTransform()))
        .apply("Timestamps", WithTimestamps.of(Utils::jodaInstantEventTimeFromRecord));

管道的最后一步应该是一个窗口函数,但为简单起见将其省略。代码原样引发异常:

Cannot output with timestamp 2022-02-16T14:47:25.688Z. Output timestamps must be no earlier than the timestamp of the current input (2022-02-16T14:47:25.725Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.

通过查看 KinesisIO 代码,没有设置时间戳功能的功能,但记录的进入时间用作时间戳。与 KafkaIO 或 AvroIo 相比,您可以覆盖从记录中提取事件时间的方式,而 KinesisIo 则没有这样的事情。

有谁知道这个问题的任何解决方法?

谢谢


KafkaIO - Apache Beam 的相关问题:使用 Withtimestamp 分配事件时间时出错

4

0 回答 0