我有一个 KCL 消费者应用程序,其中 1 名工作人员正在使用 dynamodb 流。在检查点表中,我可以看到工作人员已经租用了多个分片/租约,并且它不断增加每个租约的租约计数器。现在,每当 dynamodb 表中发生任何记录更改时,选择该更改大约需要 10 个小时。每次我重新启动消费者应用程序时,我都使用了TRIM_HORIZON并创建了一个新的 workerId 。如果工作人员只处理一个分片/租约,则只需几秒钟即可处理更改。我在配置 KCL 工作程序时可能做错了什么。请帮助我。
@Bean
public KinesisClientLibConfiguration bucketShardKinesisClientLibConfiguration() {
final String workerId =
getAppName() + BoomerangConstants.UNDERSCORE_DELIMITER + UUID.randomUUID().toString();
BasicAWSCredentials credentials = new BasicAWSCredentials(awsAccessKey, awsSecretAccessKey);
AWSStaticCredentialsProvider staticCredentialsProvider = new AWSStaticCredentialsProvider(credentials);
return new KinesisClientLibConfiguration(getAppName(),
ddbStreamProperties().getBucketShardDDBStreamArn(), staticCredentialsProvider,
workerId).withMaxRecords(6)
.withIdleTimeBetweenReadsInMillis(1000)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
.withKinesisEndpoint(ddbStreamProperties().getKinesisEndpointUrl())
.withCallProcessRecordsEvenForEmptyRecordList(true);
}