4

我在我的系统中使用了 activemq,我看到的是以下消息: TopicSubscription: consumer=...: Pending message cursor [org.apache.activemq.broker.region.cursors.VMPendingMessageCursor@1684f89c] 已满,临时使用( 0%) 或内存使用 (100%) 限制达到,阻塞消息 add() 等待资源释放。

这是因为如果我理解正确,我的消费者很慢,而我的生产者很快。结果是最终我的生产者被阻塞,直到消费者读取消息并释放一些内存。我想要的是我的生产者没有被阻止,并且当内存已满时,旧消息正在被删除。

鉴于我对已阅读内容的理解,以下配置应该可以解决问题(messageEvictionStrategy,pendingMessageLimitStrategy),但它对我不起作用,我不知道为什么。

出于测试原因,我指定了低内存使用限制(35Mb)以使问题出现得更快,但情况是,当问题出现时我最终需要它,以便 activemq 只删除旧消息。

我发现了一种不令人满意的解决方案,即在 ActiveMQConnectionFactory 中设置 useAsyncSend=true 并指定 sendTimeout。这使得生产者不会被阻塞,但这样最新的消息会被丢弃,而不是最旧的消息。

最后,我说的是非持久性话题。

任何帮助家伙都会完美。下面我有activemq配置

        <destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry topic=">" producerFlowControl="false" memoryLimit="35 Mb">
               <pendingSubscriberPolicy>
                   <vmCursor />
                </pendingSubscriberPolicy>
                    <messageEvictionStrategy>
                        <oldestMessageEvictionStrategy/>
                    </messageEvictionStrategy>
                    <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="10"/>
                    </pendingMessageLimitStrategy>
                </policyEntry>
            </policyEntries>
        </policyMap>
    </destinationPolicy>

    <systemUsage>
        <systemUsage sendFailIfNoSpace="true">
            <memoryUsage>
                <memoryUsage limit="35 mb"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="1 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="5000 mb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>  

activemq 版本 5.7.0

我使用spring模板发送消息:

    <bean class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="pooledJmsConnectionFactory"/>
    <property name="timeToLive" value="100"/>
</bean>

我传输 javax.jms.ObjectMessage,大小相对较小。

我在客户前提下发现了问题我的应用程序中有很多主题,但设法从 1 个线程本地复制它,不间断的消息总是持续发送到同一个主题。发送的消息只是一个小字符串。

我只有一个生产者,当我有 1 个(或多个)慢速消费者时,问题似乎出现了——但一个慢速消费者就足够了——。如果不存在慢消费者,则不会出现问题。

我认为这没有任何区别,但我使用

       <transportConnectors>
       <transportConnector name="openwire" uri="nio://0.0.0.0:33029?wireFormat.maxInactivityDuration=60000&amp;wireFormat.maxInactivityDurationInitalDelay=60000"/>
    </transportConnectors>
4

2 回答 2

2

我怎样才能重新创建这个?这个话题有多少生产者/消费者?难道只有一个话题吗?

您的设置看起来不错,但您不需要在策略上设置 memoryLimit=35mb。将其设置为与整体系统使用相同是没有意义的。这个想法是所有主题组合的内存限制将等于系统内存限制。例如,如果您有两个主题,每个主题将使用 35MB (2 * 35 == 70MB),这将超过整个系统内存设置。我不认为这是你所看到的具体原因,但要记住一些事情。

这也是什么版本的 ActiveMQ?如果您已经编写了一些可以产生此结果的测试,请告诉我。

于 2013-11-06T17:25:41.563 回答
1

事实证明,当使用 JmsTemplate 以异步发送然后无法传递的消息时,我们需要启用 explicitQosEnabled 并设置 deliveryMode=1(非持久性)。同样在客户端,消费者需要有一个更小的预取限制

服务器

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="pooledJmsConnectionFactory"/>
    <property name="explicitQosEnabled" value="true"/>
    <property name="deliveryMode" value="1"/>
</bean>

客户

<jms:listener-container .. prefetch="1000">
...
</jms:listener-container>

不要问我为什么……但这似乎解决了我的问题。基本上不是 100% 需要,但如果有人能向我解释这将是完美的

于 2013-11-12T08:40:42.677 回答