0

我已经使用 kinesis 设置了 spring boot、spring-integration-aws 并在应用程序启动时不断出现以下错误。

  1. 我不知道我的配置是否正确?

  2. 我的 maven pom.xml 正确吗?

  3. 如果 spring.io 可以为 spring-integration 和 aws-kinisis 提供端到端的示例应用程序,那就太好了。

任何帮助赞赏消除此错误。它不断地重复。

提前谢谢你。

2018-03-21 19:38:20.763  INFO 8417 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception when describing stream [TestStream]. Backing off for [1000] millis.

com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found. (Service: AmazonKinesis; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: 694ca3a0-2d3f-11e8-b393-0de7c3ab0fa7)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1630) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1302) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-core-1.11.297.jar:na]
    at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2388) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
    at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2364) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
    at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:754) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
    at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:729) ~[aws-java-sdk-kinesis-1.11.297.jar:na]
    at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.lambda$populateShardsForStream$0(KinesisMessageDrivenChannelAdapter.java:467) ~[spring-integration-aws-2.0.0.BUILD-20180319.195200-12.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_91]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_91]
    at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]

@Configuration
@EnableIntegration
public class ReceiverConfig {

    private final int port = 4567;

    private KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis) {
        KinesisMessageDrivenChannelAdapter adapter =
                new KinesisMessageDrivenChannelAdapter(amazonKinesis, "TestStream");
        adapter.setOutputChannel(kinesisReceiveChannel());
        adapter.setErrorMessageStrategy(new KinesisMessageHeaderErrorMessageStrategy());
        adapter.setErrorChannel(errorChannel());
        return adapter;
    }

    @Bean
    public AmazonKinesis amazonKinesis() {
        AmazonKinesis amazonKinesis;

        String url = "http://localhost:" + this.port;

        // See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
        System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");

        amazonKinesis = AmazonKinesisAsyncClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
                .withClientConfiguration(
                        new ClientConfiguration()
                                .withMaxErrorRetry(0)
                                .withConnectionTimeout(1000))
                .withEndpointConfiguration(
                        new AwsClientBuilder.EndpointConfiguration(
                                url,
                                Regions.DEFAULT_REGION.getName()))
                .build();

        //System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);

        return amazonKinesis;
    }

    @Bean
    public KinesisMessageDrivenChannelAdapter kinesisInboundChannelChannel1(AmazonKinesis amazonKinesis) {
        return kinesisMessageDrivenChannelAdapter(amazonKinesis);
    }

    @Bean
    public PollableChannel kinesisReceiveChannel() {
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.setDatatypes(String.class);
        return queueChannel;
    }

    @Bean
    public PollableChannel errorChannel() {
        QueueChannel queueChannel = new QueueChannel();
        queueChannel.addInterceptor(new ChannelInterceptorAdapter() {

            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                super.postSend(message, channel, sent);

                if (message instanceof ErrorMessage) {
                    throw (RuntimeException) ((ErrorMessage) message).getPayload();
                }
            }
        });
        return queueChannel;
    }
}



@Component
public class EventProcessor {

    @ServiceActivator(inputChannel = "kinesisReceiveChannel", poller = {@Poller(fixedRate = "1000")} )
    public void onEvent(String event) {
        System.out.println("event: " + event);
    }
}


@SpringBootApplication
public class KreceiverApplication implements CommandLineRunner {

    private static Log logger = LogFactory.getLog(KreceiverApplication.class);

    private static final String TEST_STREAM = "TestStream";

    @Autowired
    private AmazonKinesis amazonKinesis;

    public static void main(String[] args) {
        SpringApplication.run(KreceiverApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {

        try {
            this.amazonKinesis.listStreams();
        } catch (SdkClientException e) {
            logger.warn("Tests not running because no Kinesis on ...", e);
        }
    }
}

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kreceiver</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kreceiver</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <spring-cloud-aws.version>1.2.2.RELEASE</spring-cloud-aws.version>
        <spring-integration-aws-version>2.0.0.BUILD-SNAPSHOT</spring-integration-aws-version>

        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-aws</artifactId>
            <version>${spring-integration-aws-version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-aws-dependencies</artifactId>
            <version>${spring-cloud-aws.version}</version>
        </dependency>

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-kinesis</artifactId>
            <version>1.11.297</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>
4

1 回答 1

1

看起来您将kinesalite根据您在amazonKinesis(). 并且在堆栈跟踪中出现错误,例如:

 com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream TestStream under account 000000000000 not found.

这真的是说TestStream当地的 kinesalite 资源中没有流。

不确定您想从 Spring IO 中听到什么,但您在使用 AWS Kinesis 本身时遇到了一个概念错误:在开始使用之前,流必须存在于 Kinesis 中。这正是任何 AWS 资源的规则。它不会按需创建。

当然,你可以自己做。这正是我们在集成测试中所做的:

KINESIS_LOCAL_RUNNING.getKinesis().createStream(TEST_STREAM, 1);

我建议您添加这样的代码:

this.amazonKinesis.createStream(TEST_STREAM);

进入你的run(String... args).

于 2018-03-21T20:21:05.213 回答