我正在使用 Spring Cloud Stream Kinesis binder(2.1.0 版)
出于安全原因,我必须拥有一组 Kinesis 凭据和另一组 DynamoDB 和 CloudWatch 凭据。
spring.cloud.stream.kinesis.binder.kplKclEnabled
如果设置为false ,一切正常。但如果它设置为true我有例外
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream {my_stream} under account {my_account} not found
整个堆栈跟踪可在https://pastebin.com/bjvKSzrg
我想启用 KCL,所以有人知道如何避免这个错误吗?
我知道发生错误是因为 cloudwatch 和 dynamodb 的用户凭据没有“看到”提到的 Kinesis 流。但是为什么他们需要看到它呢?此外,如果禁用 KCL,它会按预期工作。所以不明白为什么它不适用于启用的 KCL
这是我的属性文件
spring.main.allow-bean-definition-overriding=true
spring.cloud.stream.bindings.input.destination=streamName
spring.cloud.stream.bindings.input.group=worker
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.kinesis.bindings.input.consumer.listener-mode=batch
spring.cloud.stream.bindings.input.binder=kinesisConsumer
spring.cloud.stream.binders.kinesisConsumer.type=kinesis
spring.cloud.stream.binders.kinesisConsumer.defaultCandidate=false
spring.cloud.stream.binders.kinesisConsumer.environment.spring.main.sources=com.philips.ka.oneka.kinesis.config.KinesisOutputConfiguration
cloud.aws.stack.auto=false
cloud.aws.credentials.useDefaultAwsCredentialsChain=false
cloud.aws.credentials.instanceProfile=true
spring.cloud.stream.kinesis.binder.kplKclEnabled=true
提到的配置类
@Configuration
@EnableConfigurationProperties(AwsProperties.class)
public class KinesisOutputConfiguration {
AwsProperties.Properties properties;
public KinesisOutputConfiguration(AwsProperties awsProperties) {
this.properties = awsProperties.getStreamType().get(AwsProperties.StreamType.SPECTRE);
}
@Bean(destroyMethod = "shutdown")
public AmazonKinesisAsync amazonKinesis() {
RefreshingCredentials refreshingCredentials = new RefreshingCredentials(this.properties.getRefreshed.getUrl(), this.properties.getHsdp().getClientId(),
this.properties.getRefreshed().getClientSecret(), this.properties.getRefreshed().getUsername(), this.properties.getRefreshed().getPassword(),
this.properties.getRefreshed().getDiscoveryUrl(), new UriTemplate("{databroker_url}/Stream/$getaccessdetails"),
new RestTemplate());
return AmazonKinesisAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("eu-west-1").build();
}
@Bean(destroyMethod = "shutdown")
public AmazonCloudWatchAsync cloudWatch() {
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
this.properties.getSecretKey()));
return AmazonCloudWatchAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
}
@Bean(destroyMethod = "shutdown")
@Primary
public AmazonDynamoDBAsync dynamoDBAsync() {
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
this.properties.getSecretKey()));
return AmazonDynamoDBAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
}
}