我正在尝试使用以下代码运行这个简单的 kinesis 消息使用者。这是应用程序中唯一的类
我正面临这个错误,因为我已经更新到最新的 kinesis binder 快照版本
@SpringBootApplication
@RestController
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String message) {
System.out.println("Message has been received"+message);
}
}
应用 yml
server.port: 8081
spring:
cloud:
stream:
bindings:
input:
destination: my.sink
content-type: application/json
cloud:
aws:
region:
static: us-east-1
credentials:
accessKey: <accessKey>
secretKey: <secretKey>
构建.gradle
buildscript {
ext {
springBootVersion = '2.0.3.RELEASE'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.kinesis.demo'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
compile('org.springframework.boot:spring-boot-starter')
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.boot:spring-boot-starter-actuator')
compile('org.springframework.cloud:spring-cloud-stream-binder-kinesis:1.0.0.BUILD-SNAPSHOT')
testCompile('org.springframework.boot:spring-boot-starter-test')
}
我收到了 bean 初始化异常,并且在创建 bean 时似乎存在问题DynamoDbMetadataStore
。
2018-07-10 10:53:22.629 INFO 18332 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.lang.IllegalStateException: The component has not been initialized: DynamoDbMetadataStore{table={SpringIntegrationMetadataStore: {AttributeDefinitions: [{AttributeName: KEY,AttributeType: S}],TableName: SpringIntegrationMetadataStore,KeySchema: [{AttributeName: KEY,KeyType: HASH}],TableStatus: ACTIVE,CreationDateTime: Wed Jun 27 10:51:53 IST 2018,ProvisionedThroughput: {NumberOfDecreasesToday: 0,ReadCapacityUnits: 1,WriteCapacityUnits: 1},TableSizeBytes: 0,ItemCount: 0,TableArn: arn:aws:dynamodb:us-east-1:1234567:table/SpringIntegrationMetadataStore,TableId: d0cf588b-e122-406b-ad82-06255dfea6d4,}}, createTableRetries=25, createTableDelay=1, readCapacity=1, writeCapacity=1, timeToLive=null}.
Is it declared as a bean? during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=LATEST, sequenceNumber='null', timestamp=null, stream='my.sink', shard='shardId-000000000000', reset=false}, state=NEW}] task invocation.
Process will be retried on the next iteration.
此错误在更新到最新的 kinesis binder 快照版本后开始。
你能检查一下是否有问题。