Kinesis:关闭工人的最佳/安全方法是什么?

我正在使用AWS Kinesis客户端库 。

我需要一种方法在部署期间关闭Kinesis Worker线程,这样我就停在检查点而不是在processRecords()的中间。

我在Worker.java看到了一个关闭boolean,但它是私有的。

我需要的原因是检查点和幂等性对我来说至关重要,我不想在批处理中间杀死进程。

[编辑]

感谢@CaptainMurphy,我注意到Worker.java公开了shutdown()方法,它安全地关闭了worker和LeaseCoordinator 。 它没有做的是在IRecordProcessor调用shutdown()任务。 它突然终止了IRecordProcessor而不用担心状态。

我确实理解KCL不保证检查点之间的幂等性,并且开发人员应该使设计容错,但我觉得在LeaseCoordinator停止之前, IRecordProcessor应该正确关闭, LeaseCoordinator

从版本1.7.1 (请参阅下面的注释)开始,应用程序可以请求正常关闭,并且在关闭之前,实现IShutdownNotificationAware的记录处理器将有机会检查点。

  • 确保记录处理器除了IShutdownNotificationAware接口之外还实现了IRecordProcessor接口。 在shutdownRequested(IRecordProcessorCheckpointer checkpointer)方法中调用检查shutdownRequested(IRecordProcessorCheckpointer checkpointer) 。 注意 – 只有当关闭原因是TERMINATE时,IRecordProcessor的shutdown方法才应该调用checkpoint
  • 在应用程序关闭时启动工作程序关闭

     Future shutdown = worker.requestShutdown(); shutdown.get(); // wait for shutdown complete 

PS:版本1.7.4之前的Kinesis客户端包含阻止正确关闭的竞争条件。 因此,请使用1.7.4或更高版本。

当您在Worker上调用shutdown时,确实会调用Record Processor shutdown方法。 您可以从ShutdownTask类中追溯它,该类由ShardConsumer类创建,该类由Worker关闭。

因此,您可以通过侦听关闭调用来检查最后收到的记录,并在最后一个处理值的迭代点将最后一个序列接收函数传递给checkpointer。 例如,在你重写的processRecords()中:

 for(Record currRecord : records) { someProcessSingleRecordMethod(currRecord) if(shutdown) { checkpointer.checkpoint(currRecord.getSequenceNumber()); return; } } 

shutdown方法将shutdown标志设置为true。

请注意,在非正常关闭(例如实例终止)的情况下,Kinesis应用程序仍然是“至少一次”设计的最佳实践。 接收和处理“一次”可能不是Kinesis的一个好用例。