我创建了 MDB 来从 MQ 中挑选消息并插入到 DB2 中。我已经创建了数据源来获取 WAS 中的数据库连接。它的插入消息。但是由于 MessageListener 的速度,一些消息没有插入,因为连接关闭..请帮我处理这里的连接..
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.naming.NamingException;
@MessageDriven(
activationConfig = { @ActivationConfigProperty(
propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(
propertyName = "destination", propertyValue = "jms/MDBQueue")
},
mappedName = "jms/MDBQueue")
public class AsyncMessageConsumerBean implements MessageListener {
// TODO Auto-generated constructor stub
private javax.naming.InitialContext ctx = null;
private javax.sql.DataSource serviceDataSource = null;
private String environment = null;
/**
* @see MessageListener#onMessage(Message)
*/
public void onMessage(Message message) {
// TODO Auto-generated method stub
System.out.println("On Message Started.....");
try{
if (message instanceof javax.jms.BytesMessage)
{
javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bytes);
System.out.println("Reply Message");
String replyMessage = new String(bytes, "UTF-8");
System.out.println(" The message received from MQ :-----" + replyMessage);
insertMQMessage(replyMessage);
}else {
javax.jms.TextMessage TextMessage = (javax.jms.TextMessage) message;
System.out.println("----------- The text message received from UM Queue"+TextMessage.getText());
insertMQMessage(TextMessage.getText());
}
}catch (JMSException ex) {
throw new RuntimeException(ex);
}catch(Exception ex){
ex.printStackTrace();
}
}
public void insertMQMessage(String mqMessage) throws Exception
{
Statement stmtsql = null;
Connection connection = null;
try
{
connection = getDBConnection();
System.out.println("Connection Object :"+connection);
String mqMsgTrackerInsertQry = "";
System.out.println("MQ Tracker insert Query:" + mqMsgTrackerInsertQry);
stmtsql = connection.createStatement();
boolean status = stmtsql.execute(mqMsgTrackerInsertQry);
}
catch(Exception e)
{
e.printStackTrace();
throw e;
}
finally
{
if (stmtsql != null)
try {
stmtsql.close();
} catch (SQLException ignore) {
}
if (connection != null)
try {
connection.close();
} catch (SQLException ignore) {
}
}
}
private Connection getDBConnection() throws SQLException {
try {
ctx = new javax.naming.InitialContext();
serviceDataSource = (javax.sql.DataSource) ctx.lookup("jdbc/DB_DS_XA");
System.out.println("Datasource initiallised"+serviceDataSource);
} catch (NamingException e) {
System.out.println("peformanceappraisalstatus: COULDN'T CREATE CONNECTION!");
e.printStackTrace();
}
Connection connection = null;
try {
connection = serviceDataSource.getConnection();
//connection.setAutoCommit(false);
} catch (SQLException e) {
throw e;
}
return connection;
}
}