我已经使用 kinesis 设置了 spring boot、spring-integration-aws 并在应用程序启动时不断出现以下错误。
我不知道我的配置是否正确?
我的 maven pom.xml 正确吗?
如果 spring.io 可以为 spring-integration 和 aws-kinisis 提供端到端的示例应用程序,那就太好了。
任何帮助赞赏消除此错误。它不断地重复。
提前谢谢你。
2018-03-21 19:38:20.763 INFO 8417 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception when describing stream [TestStream]. Backing off for [1000] millis.
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found. (Service: AmazonKinesis; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: 694ca3a0-2d3f-11e8-b393-0de7c3ab0fa7)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1630) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1302) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-core-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2388) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2364) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:754) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:729) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.lambda$populateShardsForStream$0(KinesisMessageDrivenChannelAdapter.java:467) ~[spring-integration-aws-2.0.0.BUILD-20180319.195200-12.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]
@Configuration
@EnableIntegration
public class ReceiverConfig {
private final int port = 4567;
private KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis) {
KinesisMessageDrivenChannelAdapter adapter =
new KinesisMessageDrivenChannelAdapter(amazonKinesis, "TestStream");
adapter.setOutputChannel(kinesisReceiveChannel());
adapter.setErrorMessageStrategy(new KinesisMessageHeaderErrorMessageStrategy());
adapter.setErrorChannel(errorChannel());
return adapter;
}
@Bean
public AmazonKinesis amazonKinesis() {
AmazonKinesis amazonKinesis;
String url = "http://localhost:" + this.port;
// See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
amazonKinesis = AmazonKinesisAsyncClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
.withClientConfiguration(
new ClientConfiguration()
.withMaxErrorRetry(0)
.withConnectionTimeout(1000))
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
url,
Regions.DEFAULT_REGION.getName()))
.build();
//System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
return amazonKinesis;
}
@Bean
public KinesisMessageDrivenChannelAdapter kinesisInboundChannelChannel1(AmazonKinesis amazonKinesis) {
return kinesisMessageDrivenChannelAdapter(amazonKinesis);
}
@Bean
public PollableChannel kinesisReceiveChannel() {
QueueChannel queueChannel = new QueueChannel();
queueChannel.setDatatypes(String.class);
return queueChannel;
}
@Bean
public PollableChannel errorChannel() {
QueueChannel queueChannel = new QueueChannel();
queueChannel.addInterceptor(new ChannelInterceptorAdapter() {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
super.postSend(message, channel, sent);
if (message instanceof ErrorMessage) {
throw (RuntimeException) ((ErrorMessage) message).getPayload();
}
}
});
return queueChannel;
}
}
@Component
public class EventProcessor {
@ServiceActivator(inputChannel = "kinesisReceiveChannel", poller = {@Poller(fixedRate = "1000")} )
public void onEvent(String event) {
System.out.println("event: " + event);
}
}
@SpringBootApplication
public class KreceiverApplication implements CommandLineRunner {
private static Log logger = LogFactory.getLog(KreceiverApplication.class);
private static final String TEST_STREAM = "TestStream";
@Autowired
private AmazonKinesis amazonKinesis;
public static void main(String[] args) {
SpringApplication.run(KreceiverApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
try {
this.amazonKinesis.listStreams();
} catch (SdkClientException e) {
logger.warn("Tests not running because no Kinesis on ...", e);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kreceiver</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kreceiver</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-cloud-aws.version>1.2.2.RELEASE</spring-cloud-aws.version>
<spring-integration-aws-version>2.0.0.BUILD-SNAPSHOT</spring-integration-aws-version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-aws</artifactId>
<version>${spring-integration-aws-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>${spring-cloud-aws.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.297</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>