3

我想将来自 AWS Kinesis 流的消息处理延迟一小时。我已将 KCL 使用者配置为每四分钟读取一批记录,检查每条记录的时间戳,如果任何记录不到一小时,则停止处理批次,无需检查点。我希望同一个消费者实例每四分钟重新读取一次相同的消息,直到整个批次足够老可以处理,然后对消费者进行检查点。然而,在实践中,消费者只读取一次消息,这意味着它们在准备好处理时被忽略并且不再读取。有没有办法配置消费者每次都重新读取上一个检查点的所有消息?

4

1 回答 1

0

我喜欢 AWS Kinesis Stream 开箱即用的类似功能(可能会延迟事件交付的配置)。否则,有一种方法可以延迟事件的处理,但会以浪费计算为代价。

使用 SQS(或 FIFO SQS,如果您关心事件排序)而不是 Kinesis,或者使用 Kinesis Stream 上的 AWS Lambda 将事件传输到 SQS。SQS 支持最多延迟 15 分钟的消息传递。由于您需要延迟为 60 分钟,因此您可以运行另一个 Lambda(或您自己的 SQS 消费者)来处理消息。在第一次向 Lambda(或您的 SQS 使用者)传递消息时,不要处理该消息,而只需将消息的可见性超时设置为 45 分钟(加起来是您所需的 60 分钟延迟)。仅在您第二次收到 SQS 消息后处理它。您可以检查消息之前已发送多少次,以决定是否要处理或跳过处理该消息。

于 2020-07-17T00:07:36.050 回答