1

我正在使用 AWS DynamoStream,他的 API 基于 AWS KCL。

如果我收到了我未能处理的记录,我希望这些记录稍后可用,以便重新处理它们。例如,我试图将它们保存到远程数据库,但有时会遇到网络问题。

我的问题是:

  1. 我可以以某种方式使用检查点来表明我没有处理记录吗?
  2. 我应该避免执行 Checkpointer.checkpoint() 吗?如果我在下一次调用中仍然使用它会有什么影响processRecords吗?
  3. 我可能会为此目的使用任何例外吗?
4

1 回答 1

1

KCL 不提供这种内置的重新驱动机制 - 一旦 processRecords 返回(无论是抛出异常还是成功返回),它就会认为这些记录已处理并继续前进,即使在内部它失败了。

如果您想稍后重新处理一些记录,您需要捕获这些记录并将它们存储在其他地方以便稍后重新处理尝试(明显的警告是它们不会从流的其余部分按顺序处理)。

最简单的解决方案是让您的记录处理器逻辑识别失败的记录(在返回到 KCL 之前)并将它们发送到 SQS 队列。然后,记录不会丢失,它们可以在您空闲时进行处理(或由另一个消耗 SQS 队列的进程,可能使用 DLQ 机制来处理重复的失败/放弃场景)。

要回答您的具体问题:

  1. 没有。检查点只是说“我已经走了这么远,不要看检查点之前的东西”
  2. 把检查点想象成一个全局状态。一旦设置好,它就会包含之前的所有内容。您也不需要检查对 processRecords 的每次调用 - 您可以每 X 秒或每 Y 条记录等执行一次。
  3. 不在 KCL 级别 - 您可以在内部使用特殊的异常类型,并在您返回 Kinesis 之前在您的 processRecords 的外部级别捕获它。或者您可以捕获所有异常 - 这取决于您以及您希望重新驱动逻辑的具体程度。
于 2017-11-05T18:10:01.033 回答