3

Here is spring-integration-aws project. They provide example about Inbound Channle adapter:

@SpringBootApplication
public static class MyConfiguration {

    @Autowired
    private AmazonSQSAsync amazonSqs;

    @Bean
    public PollableChannel inputChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageProducer sqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, "myQueue");
        adapter.setOutputChannel(inputChannel());
        return adapter;
    }
}

Ok, Channel and SqsMessageDrivenChannelAdapter are defined, but what is the next? Let say that I have spring bean like that:

import com.amazonaws.services.sqs.model.Message;

@Component
public class MyComponent {
    public void onMessage(Message message) throws Exception {
        //handle sqs message
    }
} 
  1. How to tell spring to pass all messages from myQueue to this component?
  2. Is there any additionbal configuration to process messages one by one? For example after receiving message SQS mark them as processing and they are not visible to other clients, so it is needed to fetch only one message, process nad fetch one next. Does this behavior enabled by default?
4

2 回答 2

3

您应该阅读Spring Integration Reference Manual

@Component
public class MyComponent {

    @ServiceActivator(inputChannel = "inputChannel")
    public void onMessage(Message message) throws Exception {
        //handle sqs message
    }

} 
于 2017-05-02T12:53:12.883 回答
2

回答你的第二个问题:

/**
 * Configure the maximum number of messages that should be retrieved during one poll to the Amazon SQS system. This
 * number must be a positive, non-zero number that has a maximum number of 10. Values higher then 10 are currently
 * not supported by the queueing system.
 *
 * @param maxNumberOfMessages
 *      the maximum number of messages (between 1-10)
 */
public void setMaxNumberOfMessages(Integer maxNumberOfMessages) {
    this.maxNumberOfMessages = maxNumberOfMessages;
}

默认情况下是10.

关于的问题mark them as processing可以通过以下SqsMessageDeletionPolicy选项来实现:

/**
 * Never deletes message automatically. The receiving listener method must acknowledge each message manually by using
 * the acknowledgment parameter.
 * <p><b>IMPORTANT</b>: When using this policy the listener method must take care of the deletion of the messages.
 * If not, it will lead to an endless loop of messages (poison messages).</p>
 *
 * @see Acknowledgment
 */
NEVER,

这样的Acknowledgment对象被放置在AwsHeaders.ACKNOWLEDGMENTMessage 标头中,您可以从您的onMessage()方法中获取并在需要时确认它。

于 2017-05-02T13:32:02.007 回答