我在使用SqsMessageDrivenChannelAdapter时遇到了一些非常糟糕的吞吐量问题,这是围绕's 的spring-integration-aws
Spring Integration 抽象。spring-cloud-aws
SimpleMessageListenerContainer
问题似乎是SimpleMessageListenerContainer
一次只能请求 10 条消息(AWS 限制),并且使这些请求非常慢 - 特别是,我观察到 ~15 tps,这太慢了。
这是我的代码:
import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class SqsConfiguration {
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html#API_ReceiveMessage_RequestParameters
private static final int SQS_RECEIVE_MAX_NUM_MESSAGES = 10;
@Autowired
private AmazonSQSAsync amazonSQSAsync;
@Bean
ThreadPoolTaskExecutor sqsThreadPoolTaskExecutor() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
return threadPoolTaskExecutor;
}
@Bean
MessageChannel receivedSqsMessageChannel() {
return new ExecutorChannel(sqsThreadPoolTaskExecutor());
}
@Bean
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter() {
final SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter = new SqsMessageDrivenChannelAdapter(amazonSQSAsync, "...");
sqsMessageDrivenChannelAdapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
sqsMessageDrivenChannelAdapter.setOutputChannel(receivedSqsMessageChannel());
sqsMessageDrivenChannelAdapter.setMaxNumberOfMessages(SQS_RECEIVE_MAX_NUM_MESSAGES);
return sqsMessageDrivenChannelAdapter;
}
}
receivedSqsMessageChannel
是最终在消息Acknowledgement#acknowledge()
被调用时结束的流的输入。
SimpleMessageListenerContainer
鉴于(因此)还没有任何功能SqsMessageDrivenChannelAdapter
可以在多个线程上进行轮询(有关于 的讨论,spring-cloud-aws
但距离实现还很遥远,之后spring-integration-aws
将引入对它的支持),以及我现在拥有的 TPS是不可接受的,我认为目前最好的“修复”是让多个SqsMessageDrivenChannelAdapter
s 同时轮询并输出到receivedSqsMessageChannel
.
我怎样才能在春天实现这一目标?有没有办法让我SqsMessageDrivenChannelAdapter
使用 Spring 注释同时运行一个 s 池?