我试图弄清楚如何配置 jms 侦听器以侦听 AWS 队列并在许多线程中处理消息(同时约 100 个)。
下面是我的配置。
@Configuration
@EnableJms
public class JmsConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(AmazonSQS amazonSQS) {
ProviderConfiguration providerConfiguration = new ProviderConfiguration().withNumberOfMessagesToPrefetch(0);
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(providerConfiguration, amazonSQS);
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("30-100");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION);
factory.setErrorHandler(t -> {
});
return factory;
}
}
通过这种配置,我不断收到以下错误:
SQSMessageConsumer - 30秒后无法终止执行器服务ConsumerPrefetch,一些正在运行的线程将立即关闭
此外,将消息发布到AmazonSQS实例需要 20 秒。
我尝试了NumberOfMessagesToPrefetch和CacheLevel的不同组合,但没有一个能正常工作。
例如CacheLevel = CACHE_CONSUMER没有错误但一次处理 1 条消息。
请帮我配置一下。谢谢!
图书馆:
aws-java-sdk:1.11.41
弹簧-jms:5.1.7
amazon-sqs-java-messaging-lib:1.0.6