我正在使用独立运行的 HornetQ(v2.2.13) 消费者来读取 JBOSS 服务器发布的持久主题(7.1.1 最终版)。一切顺利几个小时(2-6 之间),然后消费者停止接收来自主题的消息。从服务器上的日志文件中,我看到数据不断从管道中抽出,但消费者日志文件表明客户端停止读取数据。我从客户端推断出它最后一次从主题中读取消息的时间是 12:00:00,而服务器日志说它最后一次向主题推送消息的时间是 14:00:00。
我尝试过调整 HornetQ 的配置,但它似乎并没有持续持续的时间。
我用来与主题交流的代码如下。
private TransportConfiguration getTC(String hostname) {
Map<String,Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, hostname);
params.put(TransportConstants.PORT_PROP_NAME, 5445);
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
return tc;
}
private Topic createDestination(String destinationName) {
Topic topic = new HornetQTopic(destinationName);
return topic;
}
private HornetQConnectionFactory createCF(TransportConfiguration tc) {
HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType .CF, tc);
return cf == null ? null : cf;
}
创建会话并启动它的代码片段:
TransportConfiguration tc = this.getTC(this.hostname);
HornetQConnectionFactory cf = this.createCF(tc);
cf.setRetryInterval(4000);
cf.setReconnectAttempts(10);
cf.setConfirmationWindowSize(1000000);
Destination destination = this.createDestination(this.topicName);
logger.info("Starting Topic Connection");
try {
this.connection = cf.createConnection();
connection.start();
this.session = connection.createSession(transactional, ackMode);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
logger.info("Started topic connection");
} catch (Exception ex) {
ex.printStackTrace();
logger.error("EXCEPTION!");
}