我正在尝试使用 Active MQ 覆盖消息来实现延迟队列。
每条消息都计划以 x 的延迟(比如 60 秒)传递
在两者之间,如果再次收到相同的消息,它应该覆盖以前的消息。
因此,即使我在 x 秒内收到 10 条消息。只应处理一条消息。
有没有干净的方法来做到这一点?
我正在尝试使用 Active MQ 覆盖消息来实现延迟队列。
每条消息都计划以 x 的延迟(比如 60 秒)传递
在两者之间,如果再次收到相同的消息,它应该覆盖以前的消息。
因此,即使我在 x 秒内收到 10 条消息。只应处理一条消息。
有没有干净的方法来做到这一点?
这个问题有两个部分需要分别解决:
消息可以在 ActiveMQ 中延迟吗?
是 - 请参阅延迟和安排消息传递。您需要<broker ... schedulerSupport="true">
在 ActiveMQ 配置中进行设置,并设置AMQ_SCHEDULED_DELAY
JMS 消息的属性,说明您希望消息延迟多长时间(在您的情况下为 10000)。
有什么办法可以防止同一条消息被多次消费?
是的,但这是一个应用程序问题,而不是 ActiveMQ 问题。它通常被称为重复数据删除或幂等消耗。如果您只有一个消费者,最简单的方法是跟踪在地图中收到的消息,并检查该地图是否收到消息。它已经被看到,丢弃。
对于更复杂的用例,您在不同的机器上有多个消费者,或者您希望该状态在应用程序重新启动后仍然存在,您需要在数据库中保留一个消息表,并每次查询它。
如果有帮助,请投票给这个答案,因为它鼓励人们帮助你。
同样根据 ActiveMQ BrokerService 类中的方法,您应该配置持久性以能够使用调度程序功能。
public boolean isSchedulerSupport() {
return this.schedulerSupport && (isPersistent() || jobSchedulerStore != null);
}
您可以使用位于 activemq 主目录的 conf 目录中的 activemq.xml 文件中的以下条目配置 activemq 代理以启用“schedulerSupport”。
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
您可以在配置中覆盖 BrokerService
@Configuration
@EnableJms
public class JMSConfiguration {
@Bean
public BrokerService brokerService() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setSchedulerSupport(true);
return brokerService;
}
}