我正在使用 AWS DynamoStream,他的 API 基于 AWS KCL。
如果我收到了我未能处理的记录,我希望这些记录稍后可用,以便重新处理它们。例如,我试图将它们保存到远程数据库,但有时会遇到网络问题。
我的问题是:
- 我可以以某种方式使用检查点来表明我没有处理记录吗?
- 我应该避免执行 Checkpointer.checkpoint() 吗?如果我在下一次调用中仍然使用它会有什么影响
processRecords
吗? - 我可能会为此目的使用任何例外吗?
我正在使用 AWS DynamoStream,他的 API 基于 AWS KCL。
如果我收到了我未能处理的记录,我希望这些记录稍后可用,以便重新处理它们。例如,我试图将它们保存到远程数据库,但有时会遇到网络问题。
我的问题是:
processRecords
吗?KCL 不提供这种内置的重新驱动机制 - 一旦 processRecords 返回(无论是抛出异常还是成功返回),它就会认为这些记录已处理并继续前进,即使在内部它失败了。
如果您想稍后重新处理一些记录,您需要捕获这些记录并将它们存储在其他地方以便稍后重新处理尝试(明显的警告是它们不会从流的其余部分按顺序处理)。
最简单的解决方案是让您的记录处理器逻辑识别失败的记录(在返回到 KCL 之前)并将它们发送到 SQS 队列。然后,记录不会丢失,它们可以在您空闲时进行处理(或由另一个消耗 SQS 队列的进程,可能使用 DLQ 机制来处理重复的失败/放弃场景)。
要回答您的具体问题: