3

我创建了一个SQS使用@SqsListener. 它工作正常,但我不断收到大量类似的消息:

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@372b568[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]] 不接受任务: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@4c30c2f9 at org.springframework.scheduling.concurrent.ThreadPoolTask​​Executor.execute(ThreadPoolTask​​Executor.java:317) ~[spring-context-5.1.4.RELEASE.jar: 5.1.4.RELEASE] 在 org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$AsynchronousMessageListener.run(SimpleMessageListenerContainer.java:286) ~[spring-cloud-aws-messaging-2.1.0.RELEASE.jar:2.1 .0.RELEASE] 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171] 在 java.util.concurrent。FutureTask.run(FutureTask.java:266) [na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$ Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171] 原因:java.util.concurrent.RejectedExecutionException:任务 org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@4c30c2f9 从 java.util.concurrent.ThreadPoolExecutor@372b568 被拒绝 [正在运行,池大小 = 3,活动线程 = 3,排队任务 = 0,已完成任务 = 0] 在 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[na:1.8.0_171] 在 java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) [na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) [na:1.8.0_171] at org.springframework.scheduling.concurrent.ThreadPoolTask​​Executor.execute(ThreadPoolTask​​Executor. java:314) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE] ...省略了6个常用框架

我的配置bean:

@EnableSqs
@Configuration
public class AmazonSqsConfiguration {

    @Value("${aws.sqs.accessKey}")
    private String accessKey;

    @Value("${aws.sqs.secretKey}")
    private String secretKey;

    @Value("${aws.sqs.region}")
    private String region;

    @Value("${aws.sqs.url}")
    private String url;

    @Bean
    public AmazonSQSAsync amazonSqs() {
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
        AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
        return AmazonSQSAsyncClientBuilder.standard()
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(url, region))
                .withCredentials(credentialsProvider)
                .build();
    }

}

我的消费者如下:

@SqsListener(value = "my-queue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void processSubscription(String xmlNotification) {/* Message processor */}

是否可以通过重新配置来删除它们@Bean?问题的根本原因是什么以及如何与之抗争?

我试图通过自然搜索找到解决方案并遇到以下答案。它对我不起作用,因为我没有JMS. 我无法调试,因为我什至不知道要调试什么。

4

1 回答 1

5

我找到了与我遇到的行为相关的spring-cloud-aws票。我也找到了相关的 StackOverflow 问题

因此,对我有用的解决方案如下:

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSQS);
    factory.setMaxNumberOfMessages(10);
    factory.setAutoStartup(true);
    factory.setWaitTimeOut(20);

    return factory;
}
于 2019-03-20T07:27:58.440 回答