11

我阅读了有关的文档prefetch buffer。根据我的理解,如果我将Prefetch值 =1 分配给消费者 A。Activemq 一次向 A 推送 1 条消息。一旦 A 向 activemq 发送确认,那么只有 activemq 向 A 推送另一条消息。

我的疑问是,我需要在哪里为消费者分配预取值。

我需要在消费者程序中分配预取值吗?如果是正确的,你能用简单的代码解释一下吗?

谢谢。

4

2 回答 2

12

根据ActiveMQ 手册

ActiveMQ对可以在任何时间点将多少消息流式传输给消费者使用预取限制。一旦达到预取限制,就不会再向消费者发送消息,直到消费者开始发送回消息确认(以指示消息已被处理)。可以基于每个消费者指定实际的预取限制值。

要更改所有消费者类型的预取大小,您可以使用类似于以下内容的连接 URI:

tcp://localhost:61616?jms.prefetchPolicy.all=50

要仅更改队列使用者类型的预取大小,您将使用类似于以下内容的连接 URI:

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

它也可以使用目标选项在每个消费者的基础上进行配置。

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
于 2013-09-05T11:13:00.697 回答
2

虽然它是旧线程。

如果您使用带有 ActiveMQ 集成的 spring

<!-- A connection to ActiveMQ -->
    <bean id="orderStatusAmqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
         <value>tcp://localhost:61616</value>
        </property>
        <property name="prefetchPolicy" ref="prefetchPolicy" /> 
         <property name="optimizeAcknowledge" value="true" /> 
         <property name="useAsyncSend" value="true" />
         <property name="trustedPackages">
        <list>
            <value>com.myapp.tradingplatform</value>
            <value>java</value>
        </list>
    </property>
    </bean>

<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
    <property name="queuePrefetch" value="1000" />
</bean>
    <!-- A cached connection to wrap the ActiveMQ connection -->
    <bean id="orderStatusCachedConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory">
            <ref bean="orderStatusAmqConnectionFactory" />
        </property>
        <property name="sessionCacheSize">
            <value>100</value>
        </property>
    </bean>
于 2018-02-09T11:17:26.263 回答