我对检查点存储有以下配置:
public DynamoDbMetaDataStore dynamoDbMetaDataStore() {
String url = consumerClientProperties.getDynamoDB().getUrl();
final AmazonDynamoDBAsync amazonDynamoDB = AmazonDynamoDBAsyncClientBuilder.standard()
.withEndpointConfiguration(new EndpointConfiguration(
url,
Regions.fromName(awsRegion).getName()))
.withClientConfiguration(new ClientConfiguration()
.withMaxErrorRetry(consumerClientProperties.getDynamoDB().getRetries())
.withConnectionTimeout(consumerClientProperties.getDynamoDB().getConnectionTimeout())).build();
DynamoDbMetaDataStore dynamoDbMetaDataStore = new DynamoDbMetaDataStore(amazonDynamoDB, "consumer-client-library");
return dynamoDbMetaDataStore;
}
public KinesisMessageDrivenChannelAdapter kinesisInboundChannelChannel(
AmazonKinesis amazonKinesis, String[] streamNames) {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
adapter.setConverter(null);
adapter.setOutputChannel(kinesisReceiveChannel());
adapter.setCheckpointStore(dynamoDbMetaDataStore());
adapter.setCheckpointMode(CheckpointMode.manual);
adapter.setListenerMode(ListenerMode.batch);
return adapter;
}
当消费者启动时,它通过创建以下键来查找检查点值
private String buildCheckpointKeyForShard(String stream, String shardId) {
return this.consumerGroup + ":" + stream + ":" + shardId;
}
这意味着我应该在消费者启动之前创建密钥,这需要从 kinesis 获取所有分片。
如果没有密钥,为什么没有通用的方法来创建密钥?为什么检查点存储不使用 putIfAbsent 以便可以动态生成密钥?
谢谢