它需要一些摆弄,并尝试与其他 SO 答案不同的东西。
这是我的代码,我会尽力解释。我包括了我为我的 SQS 消费者使用的所有东西。
我的配置类如下。下面唯一需要注意的是在 queueMessageHandlerFactory 方法中实例化的转换器和解析器对象。MappingJackson2MessageConverter(以防从非常明显的类名中看不出来)类处理来自 SQS 的有效负载的反序列化。
将严格的内容类型匹配设置为 false 也很重要。
此外,MappingJackson2MessageConverter 允许您设置自己的 Jackson ObjectMapper,但是如果您这样做,则需要按如下方式配置它:
objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
您可能不想这样做,因此您可以将其保留为 null,它将创建自己的 ObjectMapper。
我认为其余的代码是不言自明的......?如果没有,请告诉我。
我们的用例之间的一个区别是,您似乎正在映射您自己的自定义对象 (SqsEventDTO),我认为这是可行的?在这种情况下,我认为您不需要 MappingJackson2MessageConverter,但我可能是错的。
@Configuration
public class AppConfig {
@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
return queueMessageHandlerFactory.createQueueMessageHandler();
}
@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
factory.setAmazonSqs(sqsClient);
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setSerializedPayloadClass(String.class);
//set strict content type match to false
messageConverter.setStrictContentTypeMatch(false);
// Uses the MappingJackson2MessageConverter object to resolve/map
// the payload against the Message/S3EventNotification argument.
PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);
// Extract the acknowledgment data from the payload's headers,
// which then gets deserialized into the Acknowledgment object.
AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");
// I don't remember the specifics of WHY, however there is
// something important about the order of the argument resolvers
// in the list
factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));
return factory;
}
@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
@Autowired ThreadPoolTaskExecutor threadPoolExecutor) {
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
smlc.setWaitTimeOut(20);
smlc.setAmazonSqs(amazonSQSAsync);
smlc.setMessageHandler(queueMessageHandler);
smlc.setBeanName("ConsumerBean");
smlc.setMaxNumberOfMessages(sqsMaxMessages);
smlc.setTaskExecutor(threadPoolExecutor);
return smlc;
}
@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setMaxPoolSize(maxPoolSize);
executor.setKeepAliveSeconds(threadTimeoutSeconds);
executor.setThreadNamePrefix(threadName);
executor.initialize();
return executor;
}
}
我的 SQS 消费者服务类如下。
@Service
public class RawConsumer {
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "${input.sqs.queuename}")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception {
// Handle event here
}
希望对您有所帮助,如果您有任何问题,请告诉我。