0

我创建了 MDB 来从 MQ 中挑选消息并插入到 DB2 中。我已经创建了数据源来获取 WAS 中的数据库连接。它的插入消息。但是由于 MessageListener 的速度,一些消息没有插入,因为连接关闭..请帮我处理这里的连接..

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.naming.NamingException;

@MessageDriven(
        activationConfig = { @ActivationConfigProperty(
                propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(
                propertyName = "destination", propertyValue = "jms/MDBQueue")
        }, 
        mappedName = "jms/MDBQueue")
public class AsyncMessageConsumerBean implements MessageListener {
     // TODO Auto-generated constructor stub
    private javax.naming.InitialContext ctx = null;
    private javax.sql.DataSource serviceDataSource = null;
    private String environment = null;

    /**
     * @see MessageListener#onMessage(Message)
     */

public void onMessage(Message message) {
    // TODO Auto-generated method stub
    System.out.println("On Message Started.....");

    try{
        if (message instanceof javax.jms.BytesMessage)
        {
            javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
            byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];

            bytesMessage.readBytes(bytes);
            System.out.println("Reply Message");

            String replyMessage = new String(bytes, "UTF-8");
            System.out.println(" The message received from MQ :-----" + replyMessage);

            insertMQMessage(replyMessage);

        }else {

            javax.jms.TextMessage TextMessage = (javax.jms.TextMessage) message;
            System.out.println("----------- The text message received from UM Queue"+TextMessage.getText());

            insertMQMessage(TextMessage.getText());

        }

        }catch (JMSException ex) {
            throw new RuntimeException(ex);
        }catch(Exception ex){
            ex.printStackTrace();
        }

}
public void insertMQMessage(String mqMessage) throws Exception
{

            Statement stmtsql = null;
    Connection connection = null;

    try 
    {
        connection = getDBConnection();
        System.out.println("Connection Object :"+connection);

        String mqMsgTrackerInsertQry = "";

        System.out.println("MQ Tracker insert Query:" + mqMsgTrackerInsertQry);

        stmtsql = connection.createStatement();
        boolean status  = stmtsql.execute(mqMsgTrackerInsertQry);

    }
    catch(Exception e)
    {
        e.printStackTrace();
        throw e;
    }
    finally 
    {

        if (stmtsql != null)
            try {
                stmtsql.close();
            } catch (SQLException ignore) {
            }
        if (connection != null)
            try {
                connection.close();
            } catch (SQLException ignore) {
            }
    }    

}


private Connection getDBConnection() throws SQLException {

             try {
                ctx = new javax.naming.InitialContext();
                serviceDataSource = (javax.sql.DataSource) ctx.lookup("jdbc/DB_DS_XA");
                System.out.println("Datasource initiallised"+serviceDataSource);
            } catch (NamingException e) {
                System.out.println("peformanceappraisalstatus: COULDN'T CREATE CONNECTION!");
                e.printStackTrace();
            }
             Connection connection = null;
            try {
                 connection = serviceDataSource.getConnection();
                //connection.setAutoCommit(false);

            } catch (SQLException e) {
                throw e;
            }

            return connection;
        }
}
4

0 回答 0