2

我想了解何时从 worker 调用 IRecordProcessor 的 processRecords 方法。如果我之前对 processRecords 的调用尚未完成,worker 会调用下一个 processRecords 吗?工作人员会开始从 kinesis 获取新记录还是等到当前记录完成执行。

基本上我想等待很长时间,如果 processRecords 在将记录保存在外部数据库中时遇到一些异常,因为 db 已关闭或其他一些错误。所以想确认如果工人在早些时候完成处理之前不开始获取新记录,那么不会有任何问题?

4

1 回答 1

1

摘自其他问题:

应用程序(在 KCL 的帮助下)将继续在后台轮询“Shard Iterator”,因此您将在新数据到来时收到通知。

来源:https ://stackoverflow.com/a/35582161/1622134

而且,“worker”是指应用程序中的“Worker”线程;这是一个可运行的。

每个分片仅由一个 KCL 工作人员处理,并且只有一个对应的记录处理器,因此您永远不需要多个实例来处理一个分片。请参阅 KCL 源代码中的Worker.java类。

来源:https ://stackoverflow.com/a/34509567/1622134

要回答您的问题,您可以在您的processRecords实现中使用它。在处理记录时,当且仅当 try 部分成功时,使用 try-catch 块并将检查点写入 DynamoDB。那样; 如果写入外部数据库时出现错误,您将不会丢失记录并在重新启动时。您还应该将这些记录数据(不能插入到数据库中)保存到另一个地方以便以后处理。

另请参阅此答案:https ://stackoverflow.com/a/32517002/1622134

于 2017-01-23T08:56:56.043 回答