0

我有一个 KCL 消费者应用程序,其中 1 名工作人员正在使用 dynamodb 流。在检查点表中,我可以看到工作人员已经租用了多个分片/租约,并且它不断增加每个租约的租约计数器。现在,每当 dynamodb 表中发生任何记录更改时,选择该更改大约需要 10 个小时。每次我重新启动消费者应用程序时,我都使用了TRIM_HORIZON创建了一个新的 workerId 。如果工作人员只处理一个分片/租约,则只需几秒钟即可处理更改。我在配置 KCL 工作程序时可能做错了什么。请帮助我。

@Bean
public KinesisClientLibConfiguration bucketShardKinesisClientLibConfiguration() {
    final String workerId =
            getAppName() + BoomerangConstants.UNDERSCORE_DELIMITER + UUID.randomUUID().toString();
        BasicAWSCredentials credentials = new BasicAWSCredentials(awsAccessKey, awsSecretAccessKey);
        AWSStaticCredentialsProvider staticCredentialsProvider = new AWSStaticCredentialsProvider(credentials);
        return new KinesisClientLibConfiguration(getAppName(),
                ddbStreamProperties().getBucketShardDDBStreamArn(), staticCredentialsProvider,
                workerId).withMaxRecords(6)
                .withIdleTimeBetweenReadsInMillis(1000)
                .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
                .withKinesisEndpoint(ddbStreamProperties().getKinesisEndpointUrl())
                .withCallProcessRecordsEvenForEmptyRecordList(true);
    }
4

0 回答 0