3

我正在尝试设置一个集成流来使用来自亚马逊 sqs 队列的消息,并且到目前为止它工作正常。但我想调整每分钟或每秒的消息数量。例如每分钟 20 条消息。

这是我的 sql listener bean 的定义

    @Bean
    public MessageProducer mySqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName);
        adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);

        adapter.setVisibilityTimeout(TIMEOUT_VISIBILITY);
        adapter.setWaitTimeOut(TIMEOUT_MESSAGE_WAIT);
        adapter.setMaxNumberOfMessages(prefetch);
        adapter.setOutputChannel(processMessageChannel());
        return adapter;
    }

如您所见,我设置了每次轮询要获取的最大消息数,但是如何设置轮询之间的延迟?

在常规 jms 队列中,我可以使用 JMS.inboundAdapter 使用自定义轮询器,但似乎使用 SqsMessageDrivenChannelAdapter 我无法设置任何轮询计时器值。

也许我可以使用除 SqsMessageDrivenChannelAdapter 之外的 MessageProducer,但哪一个?

是否可以使用 sqs 设置 JMS.inboundAdapter?

4

1 回答 1

1

Spring IntegrationSqsMessageDrivenChannelAdapter是一个消息驱动主动组件。它基于SimpleMessageListenerContainerSpringh Cloud AWS 项目,该项目具有长时间运行的while()循环调用AmazonSQS.receiveMessage()。该循环中的逻辑并不太复杂:

try {
    ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
    CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
    for (Message message : receiveMessageResult.getMessages()) {
        if (isQueueRunning()) {
            MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
                 getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
        } else {
           messageBatchLatch.countDown();
        }
    }
    try {
         messageBatchLatch.await();
    } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
    }
} catch (Exception e) {

如您所见,我们在那里创建messageBatchLatch并在循环后等待它。每条消息都是由它们自己SignalExecutingRunnable处理countDown()MessageExecutor。因此,您想要做的可能是通过Thread.sleep()在目标服务方法中人为实现,以便在 SQS 轮询之间有更多的间隔。

但是我听到了您的要求,我们确实必须添加以下内容:

/**
 * The sleep interval in milliseconds used in the main loop between shards polling cycles.
 * Defaults to {@code 1000} minimum {@code 250}.
 * @param idleBetweenPolls the interval to sleep between shards polling cycles.
 */
public void setIdleBetweenPolls(int idleBetweenPolls) {
    this.idleBetweenPolls = Math.max(250, idleBetweenPolls);
}

KinesisMessageDrivenChannelAdapterSimpleMessageListenerContainer.

于 2017-10-20T16:00:27.253 回答