我正在尝试使用 Java 中 KCL 库的新功能 for AWS Kinesis 通过注册关闭挂钩来优雅地关闭所有记录处理器,然后优雅地停止工作人员。新库提供了需要实现记录处理器的新接口。但是它是如何被调用的呢?
尝试先调用 worker.requestShutdown() 然后调用 worker.shutdown() 并且它可以工作。但它是否有任何使用它的预期方式。那么同时使用它们有什么用,它的好处是什么?
我正在尝试使用 Java 中 KCL 库的新功能 for AWS Kinesis 通过注册关闭挂钩来优雅地关闭所有记录处理器,然后优雅地停止工作人员。新库提供了需要实现记录处理器的新接口。但是它是如何被调用的呢?
尝试先调用 worker.requestShutdown() 然后调用 worker.shutdown() 并且它可以工作。但它是否有任何使用它的预期方式。那么同时使用它们有什么用,它的好处是什么?
您可能知道,当您创建一个 时Worker
,它
1)在 dynamodb 中创建消费者偏移量表
2)在配置的时间间隔创建租约,安排租约接受者和租约更新者
如果您有两个分区,那么同一个 dynamodb 表中将有两条记录,这意味着分区需要租用。
例如。
{
"checkpoint": "TRIM_HORIZON",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 38,
"leaseKey": "shardId-000000000000",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
{
"checkpoint": "49570828493343584144205257440727957974505808096533676050",
"checkpointSubSequenceNumber": 0,
"leaseCounter": 40,
"leaseKey": "shardId-000000000001",
"leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
"ownerSwitchesSinceCheckpoint": 0
}
leaseCoordinatorThreadPool
)负责获取和更新租约的时间表3)然后对于流中的每个分区,Worker
创建一个内部PartitionConsumer,它实际上获取事件,并调度到您的RecordProcessor#processRecords
. 参见ProcessTask#call
4)关于您的问题,您必须将您的IRecordProcessorFactory
impl 注册到worker
,这将为ProcessorFactoryImpl
每个PartitionConsumer
.
KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
"consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
.withKinesisClientConfig(getHttpConfiguration())
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream
Worker consumerWorker = new Worker.Builder()
.recordProcessorFactory(new DavidsEventProcessorFactory())
.config(streamConfig)
.dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
.build();
public class DavidsEventProcessorFactory implements IRecordProcessorFactory {
private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);
@Override
public IRecordProcessor createProcessor() {
logger.info("Creating an EventProcessor.");
return new DavidsEventPartitionProcessor();
}
}
class DavidsEventPartitionProcessor implements IRecordProcessor {
private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);
//TODO add consumername ?
private String partitionId;
private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;
public KinesisEventPartitionProcessor() {
}
@Override
public void initialize(InitializationInput initializationInput) {
this.partitionId = initializationInput.getShardId();
logger.info("Initialised partition {} for streaming.", partitionId);
}
@Override
public void processRecords(ProcessRecordsInput recordsInput) {
recordsInput.getRecords().forEach(nativeEvent -> {
String eventPayload = new String(nativeEvent.getData().array());
logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);
//update offset after configured amount of retries
try {
recordsInput.getCheckpointer().checkpoint();
logger.debug("Persisted the consumer offset to {} for partition {}",
nativeEvent.getSequenceNumber(), partitionId);
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
});
}
@Override
public void shutdown(ShutdownInput shutdownReason) {
logger.debug("Shutting down event processor for {}", partitionId);
if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
try {
shutdownReason.getCheckpointer().checkpoint();
} catch (InvalidStateException e) {
logger.error("Cannot update consumer offset to the DynamoDB table.", e);
e.printStackTrace();
} catch (ShutdownException e) {
logger.error("Consumer Shutting down", e);
e.printStackTrace();
}
}
}
}
// 然后启动一个消费者
consumerWorker.run();
现在,当你想停止你的 Consumer instance( Worker
) 时,你不需要对 each 做太多的处理,一旦你要求它关闭PartitionConsumer
它就会被处理。Worker
与shutdown
,它要求停止,leaseCoordinatorThreadPool
它负责更新和获取租约,并等待终止。
requestShutdown
另一方面取消租约接受者,并 通知PartitionConsumer
s 关闭。
更重要的requestShutdown
是,如果您想获得有关您的通知,RecordProcessor
那么您也可以实施IShutdownNotificationAware
。这样,当您RecordProcessor
正在处理一个事件但工作人员即将关闭时,如果出现竞争条件,您应该仍然能够提交您的偏移量然后关闭。
requestShutdown
返回 a ShutdownFuture
,然后回调worker.shutdown
您必须在您的设备上实施以下方法RecordProcessor
才能收到通知requestShutdown
,
class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {
private String partitionId;
// few implementations
@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
logger.debug("Shutdown requested for {}", partitionId);
}
}
但是,如果您在通知之前松开租约,那么它可能不会被调用。
新库提供了需要实现记录处理器的新接口。但是它是如何被调用的呢?
IRecordProcessorFactory
和IRecordProcessor
。RecordProcessorFactory
连接到您的Worker
.尝试先调用 worker.requestShutdown() 然后调用 worker.shutdown() 并且它可以工作。但它有任何预期的使用方式吗?
您应该使用requestShutdown()
for graceful shutdown,这将处理竞争条件。它是在kinesis-client-1.7.1中引入的