7

我正在尝试使用 spring-cloud-aws-messaging 手动删除 AWS SQS 消息来实现逻辑。此功能是在测试中的示例中在此票证范围内实现的

@SqsListener(value = "queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(SqsEventDTO message, Acknowledgment acknowledgment) {

    LOGGER.info("Received message {}", message.getFoo());
    try {
        acknowledgment.acknowledge().get();
    } catch (InterruptedException e) {
        LOGGER.error("Opps", e);
    } catch (ExecutionException e) {
        LOGGER.error("Opps", e);
    }
}

但面对意外的异常

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance oforg.springframework.cloud.aws.messaging.listener.Acknowledgment(no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

可行的解决方案,SqsMessageDeletionPolicy.ON_SUCCESS但我想避免引发异常。

我在配置中错过了什么?

4

2 回答 2

9

它需要一些摆弄,并尝试与其他 SO 答案不同的东西。

这是我的代码,我会尽力解释。我包括了我为我的 SQS 消费者使用的所有东西。

我的配置类如下。下面唯一需要注意的是在 queueMessageHandlerFactory 方法中实例化的转换器和解析器对象。MappingJackson2MessageConverter(以防从非常明显的类名中看不出来)类处理来自 SQS 的有效负载的反序列化。

将严格的内容类型匹配设置为 false 也很重要。

此外,MappingJackson2MessageConverter 允许您设置自己的 Jackson ObjectMapper,但是如果您这样做,则需要按如下方式配置它:

objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

您可能不想这样做,因此您可以将其保留为 null,它将创建自己的 ObjectMapper。

我认为其余的代码是不言自明的......?如果没有,请告诉我。

我们的用例之间的一个区别是,您似乎正在映射您自己的自定义对象 (SqsEventDTO),我认为这是可行的?在这种情况下,我认为您不需要 MappingJackson2MessageConverter,但我可能是错的。

@Configuration
public class AppConfig {

@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) {
    return queueMessageHandlerFactory.createQueueMessageHandler();
}

@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) {

    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    factory.setAmazonSqs(sqsClient);

    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
    messageConverter.setSerializedPayloadClass(String.class);

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);

    // Uses the MappingJackson2MessageConverter object to resolve/map 
    // the payload against the Message/S3EventNotification argument.
    PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);

    // Extract the acknowledgment data from the payload's headers, 
    // which then gets deserialized into the Acknowledgment object.  
    AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");

    // I don't remember the specifics of WHY, however there is 
    // something important about the order of the argument resolvers 
    // in the list
    factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));

    return factory;
}

@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
    @Autowired ThreadPoolTaskExecutor threadPoolExecutor) {

    SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
    smlc.setWaitTimeOut(20);
    smlc.setAmazonSqs(amazonSQSAsync);
    smlc.setMessageHandler(queueMessageHandler);
    smlc.setBeanName("ConsumerBean");
    smlc.setMaxNumberOfMessages(sqsMaxMessages);
    smlc.setTaskExecutor(threadPoolExecutor);

    return smlc;
}

@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(corePoolSize);
    executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setKeepAliveSeconds(threadTimeoutSeconds);
    executor.setThreadNamePrefix(threadName);
    executor.initialize();

    return executor;
}
}

我的 SQS 消费者服务类如下。

@Service
public class RawConsumer {

@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "${input.sqs.queuename}")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception {
    // Handle event here
}

希望对您有所帮助,如果您有任何问题,请告诉我。

于 2018-12-23T22:41:32.090 回答
7

问题作者没有提到的是他试图定制 Jackson ObjectMapper。因此,他实例化了 a MappingJackson2MessageConverter,将其包裹在 a 中PayloadArgumentResolver并将其设置为单HandlerMethodArgumentResolveron QueueMessageHandlerFactory.setArgumentResolvers()。这样做会覆盖中定义的默认参数解析器列表QueueMessageHandler.initArgumentResolvers()(在创建QueueMessageHandler内部实例时调用QueueMessageHandlerFactory)。

例如,当仅将 aPayloadArgumentResolver设置为单个参数解析器时,Acknowledgement不能再绑定该参数。

因此,比为自定义 Jackson 消息转换器覆盖参数解析器列表更好的解决方案是在以下位置设置消息转换器列表QueueMessageHandlerFactory

    @Bean
    fun queueMessageHandlerFactory(objectMapper: ObjectMapper): QueueMessageHandlerFactory {
        val factory = QueueMessageHandlerFactory()

        val messageConverter = MappingJackson2MessageConverter()
        messageConverter.objectMapper = objectMapper

        factory.setMessageConverters(listOf(messageConverter)) // <-- this is the important line.
        return factory
    }

注册MessageConverters的在里面QueueMessageHandler.initArgumentResolvers()用作PayloadArgumentResolvers.

因此,这是一个侵入性较小的更改。

于 2019-09-24T14:23:18.250 回答