1

我正在使用 Spring Cloud Stream Kinesis binder(2.1.0 版)

出于安全原因,我必须拥有一组 Kinesis 凭据和另一组 DynamoDB 和 CloudWatch 凭据。

spring.cloud.stream.kinesis.binder.kplKclEnabled如果设置为false ,一切正常。但如果它设置为true我有例外

com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream {my_stream} under account {my_account} not found

整个堆栈跟踪可在https://pastebin.com/bjvKSzrg

我想启用 KCL,所以有人知道如何避免这个错误吗?

我知道发生错误是因为 cloudwatch 和 dynamodb 的用户凭据没有“看到”提到的 Kinesis 流。但是为什么他们需要看到它呢?此外,如果禁用 KCL,它会按预期工作。所以不明白为什么它不适用于启用的 KCL

这是我的属性文件

spring.main.allow-bean-definition-overriding=true
spring.cloud.stream.bindings.input.destination=streamName
spring.cloud.stream.bindings.input.group=worker
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.kinesis.bindings.input.consumer.listener-mode=batch
spring.cloud.stream.bindings.input.binder=kinesisConsumer



spring.cloud.stream.binders.kinesisConsumer.type=kinesis
spring.cloud.stream.binders.kinesisConsumer.defaultCandidate=false
spring.cloud.stream.binders.kinesisConsumer.environment.spring.main.sources=com.philips.ka.oneka.kinesis.config.KinesisOutputConfiguration

cloud.aws.stack.auto=false
cloud.aws.credentials.useDefaultAwsCredentialsChain=false
cloud.aws.credentials.instanceProfile=true

spring.cloud.stream.kinesis.binder.kplKclEnabled=true

提到的配置类

@Configuration
@EnableConfigurationProperties(AwsProperties.class)
public class KinesisOutputConfiguration {
    AwsProperties.Properties properties;

    public KinesisOutputConfiguration(AwsProperties awsProperties) {
        this.properties = awsProperties.getStreamType().get(AwsProperties.StreamType.SPECTRE);
    }

    @Bean(destroyMethod = "shutdown")
    public AmazonKinesisAsync amazonKinesis() {
        RefreshingCredentials refreshingCredentials = new RefreshingCredentials(this.properties.getRefreshed.getUrl(), this.properties.getHsdp().getClientId(),
                this.properties.getRefreshed().getClientSecret(), this.properties.getRefreshed().getUsername(), this.properties.getRefreshed().getPassword(),
                this.properties.getRefreshed().getDiscoveryUrl(), new UriTemplate("{databroker_url}/Stream/$getaccessdetails"),
                new RestTemplate());
        return AmazonKinesisAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("eu-west-1").build();
    }


    @Bean(destroyMethod = "shutdown")
    public AmazonCloudWatchAsync cloudWatch() {
        AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
                this.properties.getSecretKey()));
        return AmazonCloudWatchAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
    }

    @Bean(destroyMethod = "shutdown")
    @Primary
    public AmazonDynamoDBAsync dynamoDBAsync() {
        AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
                this.properties.getSecretKey()));
        return AmazonDynamoDBAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
    }

}
4

1 回答 1

0

您的配置是正确的:如果您需要为这些服务使用不同的凭据,您肯定需要为它们声明自定义 bean。DynamoDB 和 CloudWatch 是 Kinesis Client Library 所需的服务。它一方面用于管理流分片的偏移量,另一方面用于处理集群中的消费者实例更改以实现分片独占访问。因此,事实上 Kinesis 资源必须可供 DynamoDB 和 CloudWatch 用户使用。

在 Kinesis Client Library 中查看更多信息或咨询 AWS 支持:Kinesis Binder 无法为您解决此问题...

https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html

于 2021-03-11T15:48:16.890 回答