0

我试图了解SqsMessageDrivenChannelAdapter解决内存问题的行为。上游系统转储了数千条消息aws-sqs-queue,所有消息都被立即接收SqsMessageDrivenChannelAdapter。在 AWS 控制台上,我看不到队列中的任何可用消息。然后SqsMessageProcesser每 5 秒处理 1 条消息。

这是日志:

2019-05-21 17:28:18 INFO SQSMessageProcessor:88 - --- sqsMessageProcesser 内部--- 2019-05-21 17:28:23 INFO SQSMessageProcessor:88 - --- sqsMessageProcesser 内部--- 2019-05- 21 17:28:28 INFO SQSMessageProcessor:88 - --- sqsMessageProcesser 内部--- 2019-05-21 17:28:33 INFO SQSMessageProcessor:88 --- sqsMessageProcesser 内部--- 2019-05-21 17:28 :38 INFO SQSMessageProcessor:88 - --- 在 sqsMessageProcesser 内--- ...................

这是否意味着在SqsMessageProcesser每 5 秒处理 1 条消息时,数千条消息被保存在 ? 的(服务器)内存中in-channel

每个数据库事务大约需要 5 秒,目前我们在 PRD 上面临“内存不足”问题。

如果我在QueueChannelandsetMaxNumberOfMessages上设置容量会有帮助SqsMessageDrivenChannelAdapter吗?如果是,是否有计算这些值的标准方法?


  @Bean(name = "in-channel")
  public PollableChannel sqsInputChannel() {
    return new QueueChannel();
  }

  @Autowired
  private AmazonSQSAsync amazonSqs;

  @Bean
  public MessageProducer sqsMessageDrivenChannelAdapterForItems() {

    SqsMessageDrivenChannelAdapter adapter =
        new SqsMessageDrivenChannelAdapter(amazonSqs, "aws-sqs-queue");
    adapter.setOutputChannelName("in-channel");
    return adapter;
  }

  @ServiceActivator(inputChannel = "in-channel",
      poller = @Poller(fixedRate = "5000" , maxMessagesPerPoll = "1"))
  public void sqsMessageProcesser(Message<?> receive) throws ProcesserException {
  logger.info("--- inside sqsMessageProcesser---")
  // db transactions.
}
4

1 回答 1

0

QueueChannel实际上,为消息驱动的通道适配器放置一个反模式。后者已经是异步的并且基于一些任务调度。因此,将消费消息从源转移到内存队列肯定会导致一些麻烦。

您应该考虑改为使用直接通道,并让 SQS 消费线程被阻塞,直到您sqsMessageProcesser完成其工作。这样,您将保证没有数据丢失。

于 2020-04-08T20:18:43.887 回答