2

我的环境是 Glassfish 3.1.2 b23,在带有 JDK 1.7.0_45 的 Windows Server 2008 上以嵌入式模式捆绑了 OpenMQ(以及 FWIW,我也尝试了具有相同结果的 LOCAL 模式)。

我有一个包含一个非常简单的 JMS 组件的应用程序。JMS 组件由一个无状态会话 bean 组成,该 bean 生成发送到队列并由 MDB 使用的消息。JMS 连接池和队列是使用 Glassfish 管理 UI 创建的管理对象。

问题是,在创建和使用消息一段时间后,通常会出现以下错误:

com.sun.messaging.jms.JMSException: MQRA:DCF:allocation failure:createConnection:Error in allocating a connection. Cause: In-use connections equal max-pool-size and expired max-wait-time. Cannot allocate more connections.
    at com.sun.messaging.jms.ra.DirectConnectionFactory._allocateConnection(DirectConnectionFactory.java:548)
    at com.sun.messaging.jms.ra.DirectConnectionFactory.createConnection(DirectConnectionFactory.java:265)
    at com.sun.messaging.jms.ra.DirectConnectionFactory.createConnection(DirectConnectionFactory.java:244)
    at 
    [SNIP]
Caused by: javax.resource.spi.ResourceAllocationException: Error in allocating a connection. Cause: In-use connections equal max-pool-size and expired max-wait-time. Cannot allocate more connections.
    at com.sun.enterprise.connectors.ConnectionManagerImpl.internalGetConnection(ConnectionManagerImpl.java:307)
    at com.sun.enterprise.connectors.ConnectionManagerImpl.allocateConnection(ConnectionManagerImpl.java:236)
    at com.sun.enterprise.connectors.ConnectionManagerImpl.allocateConnection(ConnectionManagerImpl.java:165)
    at com.sun.enterprise.connectors.ConnectionManagerImpl.allocateConnection(ConnectionManagerImpl.java:160)
    at com.sun.messaging.jms.ra.DirectConnectionFactory._allocateConnection(DirectConnectionFactory.java:543)
    ... 99 more
Caused by: com.sun.appserv.connectors.internal.api.PoolingException: In-use connections equal max-pool-size and expired max-wait-time. Cannot allocate more connections.
    at com.sun.enterprise.resource.pool.ConnectionPool.getResource(ConnectionPool.java:418)
    at com.sun.enterprise.resource.pool.PoolManagerImpl.getResourceFromPool(PoolManagerImpl.java:245)
    at com.sun.enterprise.resource.pool.PoolManagerImpl.getResource(PoolManagerImpl.java:170)
    at com.sun.enterprise.connectors.ConnectionManagerImpl.getResource(ConnectionManagerImpl.java:332)
    at com.sun.enterprise.connectors.ConnectionManagerImpl.internalGetConnection(ConnectionManagerImpl.java:301)
    ... 103 more 

这似乎发生在由@Asynchronous无状态 EJB 方法启动的自动化过程翻阅项目列表并为每个项目调用最终触及此消息生成的代码路径时。当从稍微不同的代码路径调用消息生成时,一次只生成一条消息,我们看不到这个问题。

消息只需毫秒。生产和女士。消费。连接的最大等待时间为 1 分钟。也是最大值。可以同时生成消息的线程数少于与 JMS 连接工厂关联的最大连接数(大约 2 倍)。

会话是非 jms 事务和 AUTO-ACK 的生产者可以多快被背靠背调用?我会假设一旦 producer.send(..) 返回 jms 连接将被返回并可供下一个生产者使用,或者在连接被释放以用于下一次调用 .send 之前有一些非常小的延迟时间()?

消费者是一个 MDB,我假设它也使用来自生产者正在使用的同一个池的 JMS 连接(因为这是唯一的 JMS 连接池)。下面的 imqcmd 输出表明有 1 个活动使用者,但由于 MDB 是池化的,这是否意味着使用者端也可能使用 X #(MDB 实例数)的连接,或者 MDB 将只使用一个连接,而不管 # of池化 MDB 实例?

我也很好奇 MDB 中的异常处理。该模式取自 JavaEE6 示例和捕获/日志 Throwable。Throwable 中缺少对 mdc.setRollback 的调用会泄漏 JMS 连接吗?

在调试这个问题时,我一直在使用imqcmd命令。具体来说:

imqcmd查询 dst -tq -n MyQueue -u admin

---------------------------------------
Destination Name       Destination Type
---------------------------------------
MyQueue    Queue

On the broker specified by:
-------------------------
Host         Primary Port
-------------------------
localhost    7676

Destination Name                      MyQueue
Destination Type                      Queue
Destination State                     RUNNING
Created Administratively              false

Current Number of Messages
    Actual                            0
    Remote                            0
    Held in Transaction               0
Current Message Bytes
    Actual                            0
    Remote                            0
    Held in Transaction               8928
Current Number of Producers           0
Current Number of Active Consumers    1
Current Number of Backup Consumers    0

Max Number of Messages                100000
Max Total Message Bytes               10737418240
Max Bytes per Message                 10485760
Max Number of Producers               100
Max Number of Active Consumers        unlimited (-1)
Max Number of Backup Consumers        0

Limit Behavior                        REJECT_NEWEST
Consumer Flow Limit                   1000
Is Local Destination                  false
Local Delivery is Preferred           false
Use Dead Message Queue                true
XML schema validation enabled         false
XML schema URI List                   -
Reload XML schema on failure          false

Successfully queried the destination. 

生产者:ADetailBean.java

@Stateless
public class ADetailBean.java {

  private transient static final Logger log = Logger.getLogger(ADetailBean.java);

  @Resource(mappedName = "jms/MyConnectionFactory")
  private ConnectionFactory jmsConnectionFactory;

  @Resource(mappedName = "jms/myApp/MyQueue")
  private javax.jms.Queue myQueue;

  @PersistenceContext(unitName = "MyPC")
  private EntityManager entityManager;

  @EJB
  AnotherBean anotherBean;

  public void createAMessage(@NotNull AFile file, AnAction action) {
    ACollection aCol = aCollectionBean.findReqColForTaskColId(file.getCollectionId());
    if (aCol != null) {
        ADetail aDetail = new ADetail(file.getFileNumber(), file.getCollectionId(), file.getMd5(), reqCol.getCollectionId(), action);
        entityManager.persist(aDetail);
       Connection jmsConnection = null;
        try {
            jmsConnection = jmsConnectionFactory.createConnection();
            Session session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(myQueue);
            AMessage aMessage = new AMessage(file.getName());
            ObjectMessage myMessage = session.createObjectMessage(aMessage);
            producer.send(myMessage);
        } catch (JMSException e) {
            log.error("Error sending message for file number: " +
                    file.getFileNumber() + ", md5: " + file.getMd5() +
                    ", action: " + action + " : " + e.getMessage(), e);
        } finally {
            if (jmsConnection != null) {
                try {
                    jmsConnection.close();
                } catch (Exception ex) {
                    log.warn("Unable to close JMS connection: " + ex.getMessage());
                }
            }
        }
    }

//Other methods in this EJB

}

MDB:MyMessageListener.java

@MessageDriven(mappedName = "jms/myApp/MyQueue", activationConfig = {
        @ActivationConfigProperty(propertyName = "acknowledgeMode",
                propertyValue = "Auto-acknowledge"),
        @ActivationConfigProperty(propertyName = "destinationType",
                propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "subscriptionDurability",
                propertyValue = "Durable"),
        @ActivationConfigProperty(propertyName = "clientId",
                propertyValue = "myClient"),
        @ActivationConfigProperty(propertyName = "subscriptionName",
                propertyValue = "mySub")
})
public class MyMessageListener implements MessageListener {

    private static final Logger log = Logger.getLogger(MyMessageListener.class);

    @Resource
    private MessageDrivenContext mdc;

    @PersistenceContext(unitName = "MyPC")
    private EntityManager entityManager;

    @EJB
    ADetailManager aDetailManager;

    @EJB
    ADetailBean aDetailBean;

    @EJB
    AFileBean aFileBean;

    @Override
    public void onMessage(Message message) {
        log.debug("MESSAGE received from the MyQueue queue");
        try {
            if (message instanceof ObjectMessage) {
                if (((ObjectMessage) message).getObject() != null &&
                        ((ObjectMessage) message).getObject() instanceof ACalcMessage) {
                    ACalcMessage msg = (ACalcMessage) ((ObjectMessage) message).getObject();
                    log.debug("MESSAGE BEAN: Received aCalc msg for name: " + msg.getName());
                    //Wait for opportunity to process dupe recalculation for name
                    try {
                        while (!aDetailManager.obtainNameLock(msg.getName())) {
                            log.debug("Waiting to obtain name lock for name: " + msg.getName());
                            Thread.sleep(1000);
                        }

                          aDetail aDetail = aDetailBean.findNext(msg.getName());

                          //Do some processing including JPA queries and entity merges

                          entityManager.remove(aDetail);
                        }
                    } finally {
                        aDetailManager.releaseNameLock(msg.getName());
                    }

                }
            }
        } catch (JMSException e) {
            log.error("Error processing message: " + e.getMessage());
            mdc.setRollbackOnly();
        } catch (Throwable te) {
            log.error("Error processing message: ", te);
        }
    }
}

MyConnectionFactory 受管对象设置

Pool Name: jms/MyConnectionFactory
JNDI Name: jms/MyConnectionFactory
Resource Type: javax.jms.ConnectionFactory
Status: [X] Enabled

Initial Minimum Pool Size: 8
Maximum Pool Size: 64
Pool Resize Quantity: 2
Idle Timeout: 300 seconds
Max Wait Time: 60000 Milliseconds
Transaction Support: EMPTY  (I assume this means the default level, which is XA?)
Connection Validation: [X] Required 

No Additional Properties

在连接池高级选项卡中:

Validate At Most Once: 0 Seconds
Leak Timeout: 0 Seconds
Leak Reclaim: [ ]
Creation Retry Attempts: 0
Retry Interval: 10 Seconds
Pooling: [X] Enabled
Lazy Association: [ ] Enabled
Lazy Connection Enlistment: [ ] Enabled
Associate with Thread: [ ] Enabled
Match Connections: [X] Enabled
Max Connection Usage: 0

队列管理对象设置如下所示:

JNDI Name: jms/myApp/MyQueue
Resource Adapter: jmsra
Resource Type: javax.jms.Queue
Class Name: com.sun.messaging.Queue
Status: [X] Enabled

Additional Properties:
Name   |  Value
----     -------
Name      MyQueue
4

0 回答 0