对于AWS Kinesis的KCL Java库,如何使用requestShutdown和shutdown来执行正常关闭
我正在尝试使用Java中的KCL库的新function来为AWS Kinesis通过注册shutdown hook来优雅地关闭以停止所有记录处理器,然后优雅地停止工作。 新库提供了一个新的接口,需要实现记录处理器。 但它是如何被调用的?
尝试首先调用worker.requestShutdown()然后调用worker.shutdown()并且它可以正常工作。 但是它是否有任何预期的使用方式。 那么使用它们有什么用处,它的好处是什么?
开始消费者
正如您可能知道的那样,当您创建一个Worker
,它
1)在dynamodb中创建消费者偏移表
2)按照配置的时间间隔创建租赁, 安排租赁接受者和租赁续订者
如果您有两个分区,则同一个dynamodb表中将有两条记录,这意味着分区需要租约。
例如。
{ "checkpoint": "TRIM_HORIZON", "checkpointSubSequenceNumber": 0, "leaseCounter": 38, "leaseKey": "shardId-000000000000", "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83", "ownerSwitchesSinceCheckpoint": 0 } { "checkpoint": "49570828493343584144205257440727957974505808096533676050", "checkpointSubSequenceNumber": 0, "leaseCounter": 40, "leaseKey": "shardId-000000000001", "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83", "ownerSwitchesSinceCheckpoint": 0 }
- 租赁协调员ScheduledExecutorService(称为
leaseCoordinatorThreadPool
)负责接收和续订租约的时间表
3)然后,对于流中的每个分区, Worker
创建一个内部PartitionConsumer ,它实际上获取事件 ,并调度到RecordProcessor#processRecords
。 请参阅ProcessTask #call
4)关于你的问题,你必须将IRecordProcessorFactory
impl注册到worker
,这将为每个PartitionConsumer
提供一个ProcessorFactoryImpl
。
例如。 看这里的例子,这可能会有所帮助
KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration( "consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId") .withKinesisClientConfig(getHttpConfiguration()) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream Worker consumerWorker = new Worker.Builder() .recordProcessorFactory(new DavidsEventProcessorFactory()) .config(streamConfig) .dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration()))) .build(); public class DavidsEventProcessorFactory implements IRecordProcessorFactory { private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class); @Override public IRecordProcessor createProcessor() { logger.info("Creating an EventProcessor."); return new DavidsEventPartitionProcessor(); } } class DavidsEventPartitionProcessor implements IRecordProcessor { private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class); //TODO add consumername ? private String partitionId; private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE; public KinesisEventPartitionProcessor() { } @Override public void initialize(InitializationInput initializationInput) { this.partitionId = initializationInput.getShardId(); logger.info("Initialised partition {} for streaming.", partitionId); } @Override public void processRecords(ProcessRecordsInput recordsInput) { recordsInput.getRecords().forEach(nativeEvent -> { String eventPayload = new String(nativeEvent.getData().array()); logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload); //update offset after configured amount of retries try { recordsInput.getCheckpointer().checkpoint(); logger.debug("Persisted the consumer offset to {} for partition {}", nativeEvent.getSequenceNumber(), partitionId); } catch (InvalidStateException e) { logger.error("Cannot update consumer offset to the DynamoDB table.", e); e.printStackTrace(); } catch (ShutdownException e) { logger.error("Consumer Shutting down", e); e.printStackTrace(); } }); } @Override public void shutdown(ShutdownInput shutdownReason) { logger.debug("Shutting down event processor for {}", partitionId); if(shutdownReason.getShutdownReason() == RE_PARTITIONING) { try { shutdownReason.getCheckpointer().checkpoint(); } catch (InvalidStateException e) { logger.error("Cannot update consumer offset to the DynamoDB table.", e); e.printStackTrace(); } catch (ShutdownException e) { logger.error("Consumer Shutting down", e); e.printStackTrace(); } } } }
//然后开始消费者
consumerWorker.run();
阻止消费者
现在,当你想要停止你的Consumer实例( Worker
)时,你不需要对每个PartitionConsumer
做太多的处理,一旦你要求它关闭,这将由Worker
。
-
shutdown
,它要求leaseCoordinatorThreadPool
停止,负责续订和租赁,并等待终止。 -
另一方面,
requestShutdown
取消租赁者, 并通知PartitionConsumer
关闭。
requestShutdown
更重要的是,如果你想在RecordProcessor
上得到通知,那么你也可以实现IShutdownNotificationAware
。 这种情况下,当RecordProcessor
正在处理事件但工作人员即将关闭时遇到竞争条件时,您仍应该能够提交偏移量然后关闭。
requestShutdown
返回ShutdownFuture
,然后回调worker.shutdown
您必须在RecordProcessor
上实现以下方法才能在requestShutdown
上收到通知,
class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware { private String partitionId; // few implementations @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { logger.debug("Shutdown requested for {}", partitionId); } }
但是如果你在通知之前松开租约那么它可能不会被调用。
总结你的问题
新库提供了一个新的接口,需要实现记录处理器。 但它是如何被调用的?
- 实现一个
IRecordProcessorFactory
和IRecordProcessor
。 - 然后将
RecordProcessorFactory
到您的Worker
。
尝试首先调用worker.requestShutdown()然后调用worker.shutdown()并且它可以正常工作。 但它是否有任何预期的使用方式?
您应该使用requestShutdown()
进行正常关闭 ,这将处理竞争条件。 它是在kinesis-client-1.7.1中引入的