对于AWS Kinesis的KCL Java库,如何使用requestShutdown和shutdown来执行正常关闭

我正在尝试使用Java中的KCL库的新function来为AWS Kinesis通过注册shutdown hook来优雅地关闭以停止所有记录处理器,然后优雅地停止工作。 新库提供了一个新的接口,需要实现记录处理器。 但它是如何被调用的?

尝试首先调用worker.requestShutdown()然后调用worker.shutdown()并且它可以正常工作。 但是它是否有任何预期的使用方式。 那么使用它们有什么用处,它的好处是什么?

开始消费者

正如您可能知道的那样,当您创建一个Worker ,它

1)在dynamodb中创建消费者偏移表

2)按照配置的时间间隔创建租赁, 安排租赁接受者和租赁续订者

如果您有两个分区,则同一个dynamodb表中将有两条记录,这意味着分区需要租约。

例如。

 { "checkpoint": "TRIM_HORIZON", "checkpointSubSequenceNumber": 0, "leaseCounter": 38, "leaseKey": "shardId-000000000000", "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83", "ownerSwitchesSinceCheckpoint": 0 } { "checkpoint": "49570828493343584144205257440727957974505808096533676050", "checkpointSubSequenceNumber": 0, "leaseCounter": 40, "leaseKey": "shardId-000000000001", "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83", "ownerSwitchesSinceCheckpoint": 0 } 
  • 租赁协调员ScheduledExecutorService(称为leaseCoordinatorThreadPool )负责接收和续订租约的时间表

3)然后,对于流中的每个分区, Worker创建一个内部PartitionConsumer ,它实际上获取事件 ,并调度到RecordProcessor#processRecords 。 请参阅ProcessTask #call

4)关于你的问题,你必须将IRecordProcessorFactory impl注册到worker ,这将为每个PartitionConsumer提供一个ProcessorFactoryImpl

例如。 看这里的例子,这可能会有所帮助

 KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration( "consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId") .withKinesisClientConfig(getHttpConfiguration()) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream Worker consumerWorker = new Worker.Builder() .recordProcessorFactory(new DavidsEventProcessorFactory()) .config(streamConfig) .dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration()))) .build(); public class DavidsEventProcessorFactory implements IRecordProcessorFactory { private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class); @Override public IRecordProcessor createProcessor() { logger.info("Creating an EventProcessor."); return new DavidsEventPartitionProcessor(); } } class DavidsEventPartitionProcessor implements IRecordProcessor { private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class); //TODO add consumername ? private String partitionId; private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE; public KinesisEventPartitionProcessor() { } @Override public void initialize(InitializationInput initializationInput) { this.partitionId = initializationInput.getShardId(); logger.info("Initialised partition {} for streaming.", partitionId); } @Override public void processRecords(ProcessRecordsInput recordsInput) { recordsInput.getRecords().forEach(nativeEvent -> { String eventPayload = new String(nativeEvent.getData().array()); logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload); //update offset after configured amount of retries try { recordsInput.getCheckpointer().checkpoint(); logger.debug("Persisted the consumer offset to {} for partition {}", nativeEvent.getSequenceNumber(), partitionId); } catch (InvalidStateException e) { logger.error("Cannot update consumer offset to the DynamoDB table.", e); e.printStackTrace(); } catch (ShutdownException e) { logger.error("Consumer Shutting down", e); e.printStackTrace(); } }); } @Override public void shutdown(ShutdownInput shutdownReason) { logger.debug("Shutting down event processor for {}", partitionId); if(shutdownReason.getShutdownReason() == RE_PARTITIONING) { try { shutdownReason.getCheckpointer().checkpoint(); } catch (InvalidStateException e) { logger.error("Cannot update consumer offset to the DynamoDB table.", e); e.printStackTrace(); } catch (ShutdownException e) { logger.error("Consumer Shutting down", e); e.printStackTrace(); } } } } 

//然后开始消费者

 consumerWorker.run(); 

阻止消费者

现在,当你想要停止你的Consumer实例( Worker )时,你不需要对每个PartitionConsumer做太多的处理,一旦你要求它关闭,这将由Worker

  • shutdown ,它要求 leaseCoordinatorThreadPool停止,负责续订和租赁,并等待终止。

  • 另一方面, requestShutdown取消租赁者, 通知PartitionConsumer关闭。

requestShutdown更重要的是,如果你想在RecordProcessor上得到通知,那么你也可以实现IShutdownNotificationAware 。 这种情况下,当RecordProcessor正在处理事件但工作人员即将关闭时遇到竞争条件时,您仍应该能够提交偏移量然后关闭。

requestShutdown返回ShutdownFuture ,然后回调worker.shutdown

您必须在RecordProcessor上实现以下方法才能在requestShutdown上收到通知,

 class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware { private String partitionId; // few implementations @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { logger.debug("Shutdown requested for {}", partitionId); } } 

但是如果你在通知之前松开租约那么它可能不会被调用。

总结你的问题

新库提供了一个新的接口,需要实现记录处理器。 但它是如何被调用的?

  • 实现一个IRecordProcessorFactoryIRecordProcessor
  • 然后将RecordProcessorFactory到您的Worker

尝试首先调用worker.requestShutdown()然后调用worker.shutdown()并且它可以正常工作。 但它是否有任何预期的使用方式?

您应该使用requestShutdown()进行正常关闭 ,这将处理竞争条件。 它是在kinesis-client-1.7.1中引入的

Interesting Posts