1

我对检查点存储有以下配置:

public DynamoDbMetaDataStore dynamoDbMetaDataStore() {
    String url = consumerClientProperties.getDynamoDB().getUrl();
    final AmazonDynamoDBAsync amazonDynamoDB = AmazonDynamoDBAsyncClientBuilder.standard()
        .withEndpointConfiguration(new EndpointConfiguration(
            url,
            Regions.fromName(awsRegion).getName()))
        .withClientConfiguration(new ClientConfiguration()
            .withMaxErrorRetry(consumerClientProperties.getDynamoDB().getRetries())
            .withConnectionTimeout(consumerClientProperties.getDynamoDB().getConnectionTimeout())).build();
    DynamoDbMetaDataStore dynamoDbMetaDataStore = new DynamoDbMetaDataStore(amazonDynamoDB, "consumer-client-library");
    return dynamoDbMetaDataStore;
  }

  public KinesisMessageDrivenChannelAdapter kinesisInboundChannelChannel(
      AmazonKinesis amazonKinesis, String[] streamNames) {
    KinesisMessageDrivenChannelAdapter adapter =
        new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
    adapter.setConverter(null);
    adapter.setOutputChannel(kinesisReceiveChannel());
    adapter.setCheckpointStore(dynamoDbMetaDataStore());
    adapter.setCheckpointMode(CheckpointMode.manual);
    adapter.setListenerMode(ListenerMode.batch);
    return adapter;
  }

当消费者启动时,它通过创建以下键来查找检查点值

private String buildCheckpointKeyForShard(String stream, String shardId) {
    return this.consumerGroup + ":" + stream + ":" + shardId;
  }

这意味着我应该在消费者启动之前创建密钥,这需要从 kinesis 获取所有分片。

如果没有密钥,为什么没有通用的方法来创建密钥?为什么检查点存储不使用 putIfAbsent 以便可以动态生成密钥?

谢谢

4

0 回答 0