几天来,我一直在网上浏览一些文档和帖子,但无法正确配置它。
我正在独立环境中工作,我正在尝试将 HornetQ 简单地集成到我的 Spring3.1 应用程序中。
我知道我还必须修改几个 Hornetq 特定的 xml。
我的谜题中似乎总是缺少一些东西。
任何完整的简单工作样本?
非常感谢,雷。
您是使用 HornetQ 作为嵌入式 JMS 服务器还是将其与 JBoss 一起使用?在这两种情况下,它都会有 2 个不同的答案。尼古拉斯回答与嵌入式案例有关。
我假设您使用的是集成在 Jboss 中的 HornetQ,因为您说您在独立环境中工作,所以我建议您不要在 Spring 中集成 JMS,而是使用 EE 功能,如 EJB 和 MDB。它很容易配置并且已经存在。您只需要正确配置它,为消费者创建 MDB 并为初始化 spring 配置创建 @Singleton EJB。
hornetq-configuration.xml:
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq/schema/hornetq-configuration.xsd">
<persistence-enabled>false</persistence-enabled>
<create-bindings-dir>true</create-bindings-dir>
<create-journal-dir>true</create-journal-dir>
<!--<create-paging-dir>true</create-paging-dir>-->
<journal-directory>/tmp/journal</journal-directory>
<paging-directory>/tmp/paging</paging-directory>
<bindings-directory>/tmp/binding</bindings-directory>
<!-- disable security -->
<security-enabled>false</security-enabled>
<!-- Connectors -->
<connectors>
<connector name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
</connector>
</connectors>
<acceptors>
<acceptor name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
</acceptor>
</acceptors>
<!-- Other config -->
<!--<security-settings>-->
<!--<!–security for example queue–>-->
<!--<security-setting match="#">-->
<!--<permission type="createDurableQueue" roles="guest"/>-->
<!--<permission type="deleteDurableQueue" roles="guest"/>-->
<!--<permission type="createNonDurableQueue" roles="guest"/>-->
<!--<permission type="deleteNonDurableQueue" roles="guest"/>-->
<!--<permission type="consume" roles="guest"/>-->
<!--<permission type="send" roles="guest"/>-->
<!--</security-setting>-->
<!--</security-settings>-->
</configuration>
hornetq-jms.xml:
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
<retry-interval>1000</retry-interval>
<retry-interval-multiplier>1.5</retry-interval-multiplier>
<max-retry-interval>60000</max-retry-interval>
<reconnect-attempts>1000</reconnect-attempts>
</connection-factory>
<!--the queue used by the example-->
<queue name="statusQueue">
<entry name="queue/statusQueue"/>
</queue>
</configuration>
如果您启用安全性,您还应该使用用户和角色描述创建 hornetq-users.xml。
弹簧配置:
<bean id="jmsServer" class="org.hornetq.jms.server.embedded.EmbeddedJMS" init-method="start" destroy-method="stop"/>
<bean id="jmsConnectionFactory"
class="mypackecge.JmsConnecitonFactoryLocator"
depends-on="jmsServer"
factory-method="lookupConnectionFactory">
<constructor-arg name="server" ref="jmsServer"/>
</bean>
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate" depends-on="jmsServer">
<property name="connectionFactory">
<ref bean="jmsConnectionFactory"/>
</property>
<property name="destinationResolver">
<ref bean="jmsDestResolver"/>
</property>
</bean>
<bean id="jmsDestResolver" class="mypackage.EmbeddedDestinationResolver">
<property name="server" ref="jmsServer"/>
</bean>
<bean id="statusQueue" class="mypackage.JmsQueueLocator"
depends-on="jmsServer"
factory-method="lookupQueue">
<constructor-arg name="server" ref="jmsServer"/>
<constructor-arg name="queueName" value="queue/statusQueue"/>
</bean>
<bean id="jmsContainerStatus" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destination" ref="statusQueue"/>
<property name="messageListener" ref="statusChecker" />
<property name="concurrentConsumers" value="5"/>
</bean>
接下来,为查找 jms sessionFacroty 和队列创建类。
目标解析器,由 jmsTeplate 用于解析发送消息的位置。
public class EmbeddedDestinationResolver implements DestinationResolver {
private EmbeddedJMS server;
public EmbeddedJMS getServer() {
return server;
}
public void setServer(EmbeddedJMS server) {
this.server = server;
}
@Override
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
return (Destination)server.lookup(destinationName);
}
}
连接工厂定位器。
public class JmsConnecitonFactoryLocator {
public static HornetQJMSConnectionFactory lookupConnectionFactory(EmbeddedJMS server){
HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory) server.lookup("ConnectionFactory");
return cf;
}
}
队列定位器。
public class JmsQueueLocator {
public static Queue lookupQueue(EmbeddedJMS server, String queueName){
return (Queue) server.lookup(queueName);
}
}
消息处理程序。
@Component(value = "statusChecker")
public class StatusChecker implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(StatusChecker.class);
@Override
public void onMessage(Message message) {
// handle message, call message.acknowledge()
}
和 jmsTemplate 示例,用于在队列中发送消息:
@Component
public class WorkQueue {
private JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
@Autowired
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void addStatusRequestToQueue(final Transaction t) throws JMSException {
jmsTemplate.send("queue/statusQueue", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
msg.setText(String.valueOf(t.getId()));
return msg;
}
});
}