0

我构建了一个基于 JBoss 7.1.1 和外部 HornetQ 实现 2.2.1.4 的 Spring JMS 解决方案。这连接并成功运行。

但是,我现在使用的是 EAP6,并试图连接到封装在 EAP6 中的内部 HornetQ。

我有一些管理连接和创建队列的类。但是这些似乎都不需要连接到打包的 HornetQ - 可以很好地连接到外部 HornetQ。

我已经向 Redhat 提出了这个问题,他们不确定如何解决,因为这也需要 Spring 编码。

我遇到的问题是,我相信我需要创建一个 QueueConnection,如 QueueConnection qcon = queueConnectionFactory.createQueueConnection("user","password");

但是我们在 Spring 中实现的方式,是我们使用 Spring JmsTemplate,并且没有给这个添加队列连接的概念,所以它不起作用。

下面是包含所需 Spring bean的jms-services.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task" xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context 
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/task 
                        http://www.springframework.org/schema/task/spring-task.xsd
                        http://www.springframework.org/schema/tx
                        http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

    <context:annotation-config />

    <context:component-scan base-package="com.myproject.test" />

    <task:annotation-driven />

    <bean id="testTransactionManager"
        class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManagerName" value="java:/TransactionManager"></property>
        <property name="autodetectUserTransaction" value="false"></property>
    </bean>

    <tx:annotation-driven transaction-manager="testTransactionManager" />

    <bean id="queueConnectionFactory" class="com.myproject.test.impl.QueueConnectionFactoryImpl">
        <constructor-arg type="String" name="host" value="231.7.7.7" />
        <constructor-arg type="int" name="port" value="9876" />
        <constructor-arg type="boolean" name="useJta" value="true" />
        <constructor-arg type="boolean" name="useCluster" value="true" />
    </bean>

    <bean id="testQueueManager" class="com.myproject.test.impl.QueueManagerImpl">
        <constructor-arg ref="queueConnectionFactory" />
        <constructor-arg name="queue" value="TestQueue" />
    </bean>

</beans>

这是我的QueueConnectionFactoryImpl类:

package com.myproject.test.impl;

import java.util.HashMap;
import java.util.Map;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.jboss.logging.Logger;

import com.myproject.test.QueueConnectionFactory;

public class QueueConnectionFactoryImpl implements QueueConnectionFactory {

    private String host;
    private int port;
    private ConnectionFactory connectionFactory;
    private Logger logger;
    private boolean useJta = false;

    public QueueConnectionFactoryImpl(String host, int port, boolean useJta)
    {
        this.useJta = useJta;
        createConnection(host, port);
    }

    public QueueConnectionFactoryImpl(String host, int port) {

        createConnection(host, port);
    }

    public QueueConnectionFactoryImpl(String host, int port, boolean useJta, boolean useCluster)
    {
        this.useJta = useJta;
        if(useCluster)
            createClusterConnection(host, port);
        else
            createConnection(host, port);
    }

    private void createConnection(String host, int port) {

        logger = Logger.getLogger(this.getClass());

        this.host = host;
        this.port = port;

        Map<String, Object> connectionParams = new HashMap<String, Object>();
        connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
        connectionParams.put(TransportConstants.HOST_PROP_NAME, host);  

        TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(),  connectionParams);

        JMSFactoryType jmsFType = JMSFactoryType.CF; 

        if(useJta)
            jmsFType = JMSFactoryType.XA_CF;

        connectionFactory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(jmsFType, transportConfiguration);

    }

    private void createClusterConnection(String host, int port)
    {
        logger = Logger.getLogger(this.getClass());

        this.host = host;
        this.port = port;

        JMSFactoryType jmsFType = JMSFactoryType.CF; 

        if(useJta)
        jmsFType = JMSFactoryType.XA_CF;

        connectionFactory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithHA(new DiscoveryGroupConfiguration(host, port), jmsFType);

    }

    public QueueConnectionFactoryImpl(Object connectionFactory)
    {
        logger = Logger.getLogger(this.getClass());
        logger.debug("Object is: "+connectionFactory);
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public ConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public boolean isUseJta() {
        return useJta;
    }

    public void setUseJta(boolean useJta) {
        this.useJta = useJta;
    }

}

这是我的QueueManagerImpl代码

package com.myproject.test.impl;

import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Queue;

import org.apache.log4j.Logger;
import org.hornetq.api.jms.HornetQJMSClient;
import org.springframework.jms.core.JmsTemplate;

import com.myproject.test.QueueManager;

public class QueueManagerImpl implements QueueManager {

    private String queue;
    private ConnectionFactory connectionFactory;
    private JmsTemplate template;
    private Queue jmsQueue;
    private boolean useJta = false;
    private static final Logger log = Logger.getLogger(QueueManagerImpl.class);

    public QueueManagerImpl(QueueConnectionFactory queueConnectionFactory) {

        template = new JmsTemplate();
        connectionFactory = queueConnectionFactory.getConnectionFactory();
        try
        {
            this.setUseJta(queueConnectionFactory.isUseJta());
            template.setConnectionFactory(connectionFactory);
            template.setExplicitQosEnabled(true);
            template.setDeliveryMode(DeliveryMode.PERSISTENT);
            if(queueConnectionFactory.isUseJta())
                template.setSessionTransacted(true);
        }
        catch(Exception ex)
        {
            logError(ex.toString());
        }
    }

    public QueueManagerImpl(QueueConnectionFactory queueConnectionFactory, String queue) {

        this(queueConnectionFactory);
        setQueue(queue);
    }

    public String getQueue() {

        return queue;
    }

    public void setQueue(String queue) {

        try
        {
            jmsQueue = HornetQJMSClient.createQueue(queue);
            template.setDefaultDestination(jmsQueue);
            this.queue = queue;
        }
        catch(Exception ex)
        {
            logError(ex.toString());
        }
    }

    public JmsTemplate getTemplate() {
        return template;
    }

    public void logError(String error)
    {
        String details = String.format("Unable to connect to queue, details: %s ", error);
        String errorMessage = String.format("error...", details);
        log.error(errorMessage);
    }

    @Override
    public boolean isUseJta() {
        return useJta;
    }

    @Override
    public void setUseJta(boolean useJta) {
        this.useJta = useJta;
    }
}

主要的是,上面的代码需要一个ConenctionFactory对象传递给QueueManagerImpl中的JmsTemplate——template.setConnectionFactory(connectionFactory);。

我已经尝试了几种方法来让它工作:

1) 将以下内容添加到jsm-service.xml文件中:

<bean id="myConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
    <property name="targetConnectionFactory" ref="queueConnectionFactory"/>
    <property name="username" value="myuser"/>
    <property name="password" value="myuser123"/>
</bean>     

这会产生以下异常:

java.lang.IllegalStateException:无法将类型 [com.myproject.test.impl.QueueConnectionFactoryImpl] 的值转换为属性“targetConnectionFactory”所需的类型 [javax.jms.ConnectionFactory]:找不到匹配的编辑器或转换策略

2) 将 QueueConnectionFactoryImpl 中的连接更改为:

org.hornetq.jms.client.HornetQConnectionFactory HQConnectionFactory = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(jmsFType, transportConfiguration);

try {
    connectionFactory = (ConnectionFactory) HQConnectionFactory.createConnection("myuser","myuser123");
} catch (JMSException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

这也不起作用。我得到一个例外:

java.lang.ClassCastException: org.hornetq.jms.client.HornetQConnection 不能转换为 javax.jms.ConnectionFactory


简而言之,任何人都可以通过以某种方式提供用户名和密码来帮助我将上述代码连接到 HornetQ,以便我仍然可以使用 JmsTemplate。

4

1 回答 1

1

只需使用您自己的连接工厂。它对我有用:

spring bean(5445是hornetq接受器端口):

<bean name="jmsConnectionFactory" class="messaging.jms.CustomHornetQJMSConnectionFactory">
    <constructor-arg name="ha" value="false" /> <!-- set true if you want support failover -->
    <constructor-arg name="commaSepratedServerUrls" value="127.0.0.1:5445" />
    <property name="username" value="admin" />
    <property name="password" value="admin" />
</bean>

连接工厂实现(使用来自 hornetq-jms-client 的 HornetQJMSConnectionFactory 和来自 hornetq-core-client 的 TransportConfiguration):

public class CustomHornetQJMSConnectionFactory extends org.hornetq.jms.client.HornetQJMSConnectionFactory
{
    private static final long serialVersionUID = 1L;

    private String username;
    private String password;

    public CustomHornetQJMSConnectionFactory(boolean ha, String commaSepratedServerUrls)
    {
        super(ha, converToTransportConfigurations(commaSepratedServerUrls));
    }

    public static TransportConfiguration[] converToTransportConfigurations(String commaSepratedServerUrls)
    {   
        String [] serverUrls = commaSepratedServerUrls.split(",");
        TransportConfiguration[] transportconfigurations = new TransportConfiguration[serverUrls.length];
        for(int i = 0; i < serverUrls.length; i++)
        {
            String[] urlParts = serverUrls[i].split(":");
            HashMap<String, Object> map = new HashMap<String,Object>();
            map.put(TransportConstants.HOST_PROP_NAME, urlParts[0]);
            map.put(TransportConstants.PORT_PROP_NAME, urlParts[1]);
            transportconfigurations[i] = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
        }
        return transportconfigurations;
    }

    @Override
    public Connection createConnection() throws JMSException 
    {
        return super.createConnection(username, password);
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
}

现在,如果将此连接工厂提供给 jmsTemplate,则可以使用 user/pass 发送/消费消息

于 2013-04-25T19:01:48.067 回答