6

我正在使用 AWS Kinesis 客户端库

我需要一种在部署期间关闭 Kinesis Worker 线程的方法,以便我停在检查点而不是processRecords().

我看到一个关闭布尔值存在,Worker.java但它是私有的。

我需要的原因是检查点和幂等性对我来说至关重要,我不想在批处理中间终止该进程。

[编辑]

感谢@CaptainMurphy,我注意到Worker.java公开shutdown()方法可以安全地关闭工作人员和LeaseCoordinator. 它不做的是shutdown()IRecordProcessor. 它突然终止,IRecordProcessor而不用担心状态。

我确实理解 KCL 不保证检查点之间的幂等性,开发人员应该使设计容错,但我觉得无论如何都IRecordProcessor应该在停止之前正确关闭检查LeaseCoordinator点。

4

2 回答 2

2

当您在 Worker 上调用 shutdown 时,确实会调用 Record Processor 关闭方法。您可以从 ShutdownTask 类追溯它,该类由 ShardConsumer 类创建,该类由 Worker 关闭。

因此,您可以通过侦听关闭调用在最后收到的记录处检查点,在最后处理的值的迭代点将最后一个序列接收函数传递给检查点。例如,在您覆盖的 processRecords() 中:

for(Record currRecord : records)
{
    someProcessSingleRecordMethod(currRecord)
    if(shutdown) 
    { 
        checkpointer.checkpoint(currRecord.getSequenceNumber()); 
        return; 
    } 
}

您的关闭方法将关闭标志设置为 true。

请注意,在非正常关闭(例如实例终止)的情况下,以“至少一次”方式设计 Kinesis 应用程序仍然是最佳实践。“仅一次”接收和处理可能不是 Kinesis 的好用例。

于 2015-05-04T10:53:07.020 回答
2

由于版本1.7.1(参见下面的注释)应用程序可以请求正常关闭,并且实现 IShutdownNotificationAware 的记录处理器将有机会在关闭之前进行检查点。

  • 确保记录处理器IShutdownNotificationAware除了接口之一之外还实现IRecordProcessor接口。shutdownRequested(IRecordProcessorCheckpointer checkpointer)在方法中调用检查点。注意 - shutdownIRecordProcessor 的方法只有在关闭原因为 TERMINATE时才应调用检查点
  • 在应用程序关闭时启动工作人员关闭过程

    Future<Void> shutdown = worker.requestShutdown();
    shutdown.get(); // wait for shutdown complete
    

PS:版本1.7.4之前的 Kinesis Client包含阻止正确关闭的竞争条件。所以请使用 1.7.4 或更高版本。

于 2017-03-01T10:29:41.123 回答