我的环境是 Glassfish 3.1.2 b23,在带有 JDK 1.7.0_45 的 Windows Server 2008 上以嵌入式模式捆绑了 OpenMQ(以及 FWIW,我也尝试了具有相同结果的 LOCAL 模式)。
我有一个包含一个非常简单的 JMS 组件的应用程序。JMS 组件由一个无状态会话 bean 组成,该 bean 生成发送到队列并由 MDB 使用的消息。JMS 连接池和队列是使用 Glassfish 管理 UI 创建的管理对象。
问题是,在创建和使用消息一段时间后,通常会出现以下错误:
com.sun.messaging.jms.JMSException: MQRA:DCF:allocation failure:createConnection:Error in allocating a connection. Cause: In-use connections equal max-pool-size and expired max-wait-time. Cannot allocate more connections.
at com.sun.messaging.jms.ra.DirectConnectionFactory._allocateConnection(DirectConnectionFactory.java:548)
at com.sun.messaging.jms.ra.DirectConnectionFactory.createConnection(DirectConnectionFactory.java:265)
at com.sun.messaging.jms.ra.DirectConnectionFactory.createConnection(DirectConnectionFactory.java:244)
at
[SNIP]
Caused by: javax.resource.spi.ResourceAllocationException: Error in allocating a connection. Cause: In-use connections equal max-pool-size and expired max-wait-time. Cannot allocate more connections.
at com.sun.enterprise.connectors.ConnectionManagerImpl.internalGetConnection(ConnectionManagerImpl.java:307)
at com.sun.enterprise.connectors.ConnectionManagerImpl.allocateConnection(ConnectionManagerImpl.java:236)
at com.sun.enterprise.connectors.ConnectionManagerImpl.allocateConnection(ConnectionManagerImpl.java:165)
at com.sun.enterprise.connectors.ConnectionManagerImpl.allocateConnection(ConnectionManagerImpl.java:160)
at com.sun.messaging.jms.ra.DirectConnectionFactory._allocateConnection(DirectConnectionFactory.java:543)
... 99 more
Caused by: com.sun.appserv.connectors.internal.api.PoolingException: In-use connections equal max-pool-size and expired max-wait-time. Cannot allocate more connections.
at com.sun.enterprise.resource.pool.ConnectionPool.getResource(ConnectionPool.java:418)
at com.sun.enterprise.resource.pool.PoolManagerImpl.getResourceFromPool(PoolManagerImpl.java:245)
at com.sun.enterprise.resource.pool.PoolManagerImpl.getResource(PoolManagerImpl.java:170)
at com.sun.enterprise.connectors.ConnectionManagerImpl.getResource(ConnectionManagerImpl.java:332)
at com.sun.enterprise.connectors.ConnectionManagerImpl.internalGetConnection(ConnectionManagerImpl.java:301)
... 103 more
这似乎发生在由@Asynchronous
无状态 EJB 方法启动的自动化过程翻阅项目列表并为每个项目调用最终触及此消息生成的代码路径时。当从稍微不同的代码路径调用消息生成时,一次只生成一条消息,我们看不到这个问题。
消息只需毫秒。生产和女士。消费。连接的最大等待时间为 1 分钟。也是最大值。可以同时生成消息的线程数少于与 JMS 连接工厂关联的最大连接数(大约 2 倍)。
会话是非 jms 事务和 AUTO-ACK 的生产者可以多快被背靠背调用?我会假设一旦 producer.send(..) 返回 jms 连接将被返回并可供下一个生产者使用,或者在连接被释放以用于下一次调用 .send 之前有一些非常小的延迟时间()?
消费者是一个 MDB,我假设它也使用来自生产者正在使用的同一个池的 JMS 连接(因为这是唯一的 JMS 连接池)。下面的 imqcmd 输出表明有 1 个活动使用者,但由于 MDB 是池化的,这是否意味着使用者端也可能使用 X #(MDB 实例数)的连接,或者 MDB 将只使用一个连接,而不管 # of池化 MDB 实例?
我也很好奇 MDB 中的异常处理。该模式取自 JavaEE6 示例和捕获/日志 Throwable。Throwable 中缺少对 mdc.setRollback 的调用会泄漏 JMS 连接吗?
在调试这个问题时,我一直在使用imqcmd命令。具体来说:
imqcmd查询 dst -tq -n MyQueue -u admin
---------------------------------------
Destination Name Destination Type
---------------------------------------
MyQueue Queue
On the broker specified by:
-------------------------
Host Primary Port
-------------------------
localhost 7676
Destination Name MyQueue
Destination Type Queue
Destination State RUNNING
Created Administratively false
Current Number of Messages
Actual 0
Remote 0
Held in Transaction 0
Current Message Bytes
Actual 0
Remote 0
Held in Transaction 8928
Current Number of Producers 0
Current Number of Active Consumers 1
Current Number of Backup Consumers 0
Max Number of Messages 100000
Max Total Message Bytes 10737418240
Max Bytes per Message 10485760
Max Number of Producers 100
Max Number of Active Consumers unlimited (-1)
Max Number of Backup Consumers 0
Limit Behavior REJECT_NEWEST
Consumer Flow Limit 1000
Is Local Destination false
Local Delivery is Preferred false
Use Dead Message Queue true
XML schema validation enabled false
XML schema URI List -
Reload XML schema on failure false
Successfully queried the destination.
生产者:ADetailBean.java
@Stateless
public class ADetailBean.java {
private transient static final Logger log = Logger.getLogger(ADetailBean.java);
@Resource(mappedName = "jms/MyConnectionFactory")
private ConnectionFactory jmsConnectionFactory;
@Resource(mappedName = "jms/myApp/MyQueue")
private javax.jms.Queue myQueue;
@PersistenceContext(unitName = "MyPC")
private EntityManager entityManager;
@EJB
AnotherBean anotherBean;
public void createAMessage(@NotNull AFile file, AnAction action) {
ACollection aCol = aCollectionBean.findReqColForTaskColId(file.getCollectionId());
if (aCol != null) {
ADetail aDetail = new ADetail(file.getFileNumber(), file.getCollectionId(), file.getMd5(), reqCol.getCollectionId(), action);
entityManager.persist(aDetail);
Connection jmsConnection = null;
try {
jmsConnection = jmsConnectionFactory.createConnection();
Session session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(myQueue);
AMessage aMessage = new AMessage(file.getName());
ObjectMessage myMessage = session.createObjectMessage(aMessage);
producer.send(myMessage);
} catch (JMSException e) {
log.error("Error sending message for file number: " +
file.getFileNumber() + ", md5: " + file.getMd5() +
", action: " + action + " : " + e.getMessage(), e);
} finally {
if (jmsConnection != null) {
try {
jmsConnection.close();
} catch (Exception ex) {
log.warn("Unable to close JMS connection: " + ex.getMessage());
}
}
}
}
//Other methods in this EJB
}
MDB:MyMessageListener.java
@MessageDriven(mappedName = "jms/myApp/MyQueue", activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode",
propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType",
propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "subscriptionDurability",
propertyValue = "Durable"),
@ActivationConfigProperty(propertyName = "clientId",
propertyValue = "myClient"),
@ActivationConfigProperty(propertyName = "subscriptionName",
propertyValue = "mySub")
})
public class MyMessageListener implements MessageListener {
private static final Logger log = Logger.getLogger(MyMessageListener.class);
@Resource
private MessageDrivenContext mdc;
@PersistenceContext(unitName = "MyPC")
private EntityManager entityManager;
@EJB
ADetailManager aDetailManager;
@EJB
ADetailBean aDetailBean;
@EJB
AFileBean aFileBean;
@Override
public void onMessage(Message message) {
log.debug("MESSAGE received from the MyQueue queue");
try {
if (message instanceof ObjectMessage) {
if (((ObjectMessage) message).getObject() != null &&
((ObjectMessage) message).getObject() instanceof ACalcMessage) {
ACalcMessage msg = (ACalcMessage) ((ObjectMessage) message).getObject();
log.debug("MESSAGE BEAN: Received aCalc msg for name: " + msg.getName());
//Wait for opportunity to process dupe recalculation for name
try {
while (!aDetailManager.obtainNameLock(msg.getName())) {
log.debug("Waiting to obtain name lock for name: " + msg.getName());
Thread.sleep(1000);
}
aDetail aDetail = aDetailBean.findNext(msg.getName());
//Do some processing including JPA queries and entity merges
entityManager.remove(aDetail);
}
} finally {
aDetailManager.releaseNameLock(msg.getName());
}
}
}
} catch (JMSException e) {
log.error("Error processing message: " + e.getMessage());
mdc.setRollbackOnly();
} catch (Throwable te) {
log.error("Error processing message: ", te);
}
}
}
MyConnectionFactory 受管对象设置
Pool Name: jms/MyConnectionFactory
JNDI Name: jms/MyConnectionFactory
Resource Type: javax.jms.ConnectionFactory
Status: [X] Enabled
Initial Minimum Pool Size: 8
Maximum Pool Size: 64
Pool Resize Quantity: 2
Idle Timeout: 300 seconds
Max Wait Time: 60000 Milliseconds
Transaction Support: EMPTY (I assume this means the default level, which is XA?)
Connection Validation: [X] Required
No Additional Properties
在连接池高级选项卡中:
Validate At Most Once: 0 Seconds
Leak Timeout: 0 Seconds
Leak Reclaim: [ ]
Creation Retry Attempts: 0
Retry Interval: 10 Seconds
Pooling: [X] Enabled
Lazy Association: [ ] Enabled
Lazy Connection Enlistment: [ ] Enabled
Associate with Thread: [ ] Enabled
Match Connections: [X] Enabled
Max Connection Usage: 0
队列管理对象设置如下所示:
JNDI Name: jms/myApp/MyQueue
Resource Adapter: jmsra
Resource Type: javax.jms.Queue
Class Name: com.sun.messaging.Queue
Status: [X] Enabled
Additional Properties:
Name | Value
---- -------
Name MyQueue