我在 hornetQ 中与一位消费者创建了会话,然后我使用生产者在队列中添加了 4 条消息。在此之后,我创建了新的消费者。
这个消费者会知道旧消息吗?
如果不是,是否可以在 XML 中配置它?
我创建了无法获取以前消息的新消费者。我只是想确认这种行为是否正确?我在文档中没有找到任何帮助。
下面是代码片段:
TextMessage receivedMessage = (TextMessage)consumer.receive();
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());
//consumer.close();
MessageConsumer newConsumer = session.createConsumer(orderQueue);
receivedMessage = (TextMessage)newConsumer.receive();
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());
如果我取消注释 consumer.close() 行,它工作正常。
我的 hornetq-jms.xml
<connection-factory name="NettyConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="/XAConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
<connection-factory name="NettyConnectionFactory">
<xa>false</xa>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="/ConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
<xa>true</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
<entry name="/XAThroughputConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
<xa>false</xa>
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
<entry name="/ThroughputConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
连接工厂代码片段
TransportConfiguration transportConfiguration = new
TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
Queue orderQueue = HornetQJMSClient.createQueue("MutationPipelineQueue");
getTransportConfiguration() 的代码:
private synchronized static TransportConfiguration getTransportConfiguration() {
HashMap<String, TransportConfiguration> transportConfigurationMap = new HashMap<String, TransportConfiguration>();
TransportConfiguration tc = transportConfigurationMap.get("machinename:5455");
if(tc == null){
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,"machinename");
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,Integer.valueOf("5455"));
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
transportConfigurationMap.put("machinename:5455", tc);
}
return tc;