我在我的系统中使用了 activemq,我看到的是以下消息: TopicSubscription: consumer=...: Pending message cursor [org.apache.activemq.broker.region.cursors.VMPendingMessageCursor@1684f89c] 已满,临时使用( 0%) 或内存使用 (100%) 限制达到,阻塞消息 add() 等待资源释放。
这是因为如果我理解正确,我的消费者很慢,而我的生产者很快。结果是最终我的生产者被阻塞,直到消费者读取消息并释放一些内存。我想要的是我的生产者没有被阻止,并且当内存已满时,旧消息正在被删除。
鉴于我对已阅读内容的理解,以下配置应该可以解决问题(messageEvictionStrategy,pendingMessageLimitStrategy),但它对我不起作用,我不知道为什么。
出于测试原因,我指定了低内存使用限制(35Mb)以使问题出现得更快,但情况是,当问题出现时我最终需要它,以便 activemq 只删除旧消息。
我发现了一种不令人满意的解决方案,即在 ActiveMQConnectionFactory 中设置 useAsyncSend=true 并指定 sendTimeout。这使得生产者不会被阻塞,但这样最新的消息会被丢弃,而不是最旧的消息。
最后,我说的是非持久性话题。
任何帮助家伙都会完美。下面我有activemq配置
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="false" memoryLimit="35 Mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
<messageEvictionStrategy>
<oldestMessageEvictionStrategy/>
</messageEvictionStrategy>
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage limit="35 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="5000 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
activemq 版本 5.7.0
我使用spring模板发送消息:
<bean class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledJmsConnectionFactory"/>
<property name="timeToLive" value="100"/>
</bean>
我传输 javax.jms.ObjectMessage,大小相对较小。
我在客户前提下发现了问题我的应用程序中有很多主题,但设法从 1 个线程本地复制它,不间断的消息总是持续发送到同一个主题。发送的消息只是一个小字符串。
我只有一个生产者,当我有 1 个(或多个)慢速消费者时,问题似乎出现了——但一个慢速消费者就足够了——。如果不存在慢消费者,则不会出现问题。
我认为这没有任何区别,但我使用
<transportConnectors>
<transportConnector name="openwire" uri="nio://0.0.0.0:33029?wireFormat.maxInactivityDuration=60000&wireFormat.maxInactivityDurationInitalDelay=60000"/>
</transportConnectors>