Kinesis:关闭工人的最佳/安全方法是什么?
我正在使用AWS Kinesis客户端库 。
我需要一种方法在部署期间关闭Kinesis Worker线程,这样我就停在检查点而不是在processRecords()
的中间。
我在Worker.java
看到了一个关闭boolean,但它是私有的。
我需要的原因是检查点和幂等性对我来说至关重要,我不想在批处理中间杀死进程。
[编辑]
感谢@CaptainMurphy,我注意到Worker.java
公开了shutdown()
方法,它安全地关闭了worker和LeaseCoordinator
。 它没有做的是在IRecordProcessor
调用shutdown()
任务。 它突然终止了IRecordProcessor
而不用担心状态。
我确实理解KCL不保证检查点之间的幂等性,并且开发人员应该使设计容错,但我觉得在LeaseCoordinator
停止之前, IRecordProcessor
应该正确关闭, LeaseCoordinator
。
从版本1.7.1 (请参阅下面的注释)开始,应用程序可以请求正常关闭,并且在关闭之前,实现IShutdownNotificationAware的记录处理器将有机会检查点。
- 确保记录处理器除了
IShutdownNotificationAware
接口之外还实现了IRecordProcessor
接口。 在shutdownRequested(IRecordProcessorCheckpointer checkpointer)
方法中调用检查shutdownRequested(IRecordProcessorCheckpointer checkpointer)
。 注意 – 只有当关闭原因是TERMINATE时,IRecordProcessor的shutdown
方法才应该调用checkpoint -
在应用程序关闭时启动工作程序关闭
Future
shutdown = worker.requestShutdown(); shutdown.get(); // wait for shutdown complete
PS:版本1.7.4之前的Kinesis客户端包含阻止正确关闭的竞争条件。 因此,请使用1.7.4或更高版本。
当您在Worker上调用shutdown时,确实会调用Record Processor shutdown方法。 您可以从ShutdownTask类中追溯它,该类由ShardConsumer类创建,该类由Worker关闭。
因此,您可以通过侦听关闭调用来检查最后收到的记录,并在最后一个处理值的迭代点将最后一个序列接收函数传递给checkpointer。 例如,在你重写的processRecords()中:
for(Record currRecord : records) { someProcessSingleRecordMethod(currRecord) if(shutdown) { checkpointer.checkpoint(currRecord.getSequenceNumber()); return; } }
shutdown方法将shutdown标志设置为true。
请注意,在非正常关闭(例如实例终止)的情况下,Kinesis应用程序仍然是“至少一次”设计的最佳实践。 接收和处理“一次”可能不是Kinesis的一个好用例。