我阅读了有关的文档prefetch buffer
。根据我的理解,如果我将Prefetch
值 =1 分配给消费者 A。Activemq 一次向 A 推送 1 条消息。一旦 A 向 activemq 发送确认,那么只有 activemq 向 A 推送另一条消息。
我的疑问是,我需要在哪里为消费者分配预取值。
我需要在消费者程序中分配预取值吗?如果是正确的,你能用简单的代码解释一下吗?
谢谢。
根据ActiveMQ 手册:
ActiveMQ对可以在任何时间点将多少消息流式传输给消费者使用预取限制。一旦达到预取限制,就不会再向消费者发送消息,直到消费者开始发送回消息确认(以指示消息已被处理)。可以基于每个消费者指定实际的预取限制值。
要更改所有消费者类型的预取大小,您可以使用类似于以下内容的连接 URI:
tcp://localhost:61616?jms.prefetchPolicy.all=50
要仅更改队列使用者类型的预取大小,您将使用类似于以下内容的连接 URI:
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
它也可以使用目标选项在每个消费者的基础上进行配置。
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
虽然它是旧线程。
如果您使用带有 ActiveMQ 集成的 spring
<!-- A connection to ActiveMQ -->
<bean id="orderStatusAmqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
<property name="prefetchPolicy" ref="prefetchPolicy" />
<property name="optimizeAcknowledge" value="true" />
<property name="useAsyncSend" value="true" />
<property name="trustedPackages">
<list>
<value>com.myapp.tradingplatform</value>
<value>java</value>
</list>
</property>
</bean>
<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch" value="1000" />
</bean>
<!-- A cached connection to wrap the ActiveMQ connection -->
<bean id="orderStatusCachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<ref bean="orderStatusAmqConnectionFactory" />
</property>
<property name="sessionCacheSize">
<value>100</value>
</property>
</bean>