1

我有一段基于 websphere 的代码,旨在从 MQ 队列中提取消息并对其进行处理。大多数情况下,代码运行良好,但每隔几天,代码会继续运行,但即使队列中仍有消息,它也会停止接收消息。为了让这个过程恢复工作,我需要碰撞应用程序,然后一切都开始正常工作。

消息可能非常大(每条消息最多 4MB),我在 WAS 7 上运行

我没有看到任何似乎代表错误的消息或异常。

这是供人们评论的代码

public class BlipMqProcessor {

    protected static final int ONE_SECOND = 1000;
    protected static final int ONE_HOUR = 60 * 60 * ONE_SECOND;
    protected static final int MQ_READ_TIMEOUT = Integer.parseInt(Constants.MQ_READ_TIMEOUT_IN_SECONDS) * ONE_SECOND;

    protected static int previousMqReasonCode;
    protected static long previousMqErrorTime;

    private BlipXmlProcessor xmlProcessor;

    // Member variables for MQ processing
    protected MQQueueManager qMgr;
    protected MQQueue queue;
    protected MQGetMessageOptions gmo;

    /**
     * Constructs a new BlipMqProcessor with the given values.
     */
    public BlipMqProcessor() {
        this(new BlipXmlProcessor());
    }

    /**
     * Constructs a new BlipMqProcessor with the given values.
     * @param xmlProcessor the processor that will be used to create the
     *      staging table entries.
     */
    public BlipMqProcessor(final BlipXmlProcessor xmlProcessor) {
        super();
        this.xmlProcessor = xmlProcessor;
    }

    /**
     * Reads XML messages from the Constants.MQ_ACCESS_QUEUE_NAME
     * 
     * @throws BlipModelException if there are any 
     */
    public void readFromMQ() throws BlipModelException {
        try {
            createNewConnectionToMQ();
            while(true) {
                MQMessage outputMessage = new MQMessage();
                queue.get(outputMessage,gmo);
                String blipModelXml = outputMessage.readLine();
                BlipLogs.logXML("BlipREQ", "0", blipModelXml);
                processMessage(blipModelXml);
                qMgr.commit();
            }
        } catch (final MQException e) {
            if (e.reasonCode != MQException.MQRC_NO_MSG_AVAILABLE) {
                handleMqException(e);
            }
        } catch (final IOException e) {
            throw new BlipModelException("MQ", "Error reading MQ message.", "BlipMqProcessor.readFromMQ", e);
        } finally {
            cleanupMQResources();
        }
    }


    /**
     * Clean up MQ resources.
     */
    private void cleanupMQResources() {
        // Close queue
        if(queue != null) {
           try {
              queue.close();
           }catch(final MQException e) {
               BlipModelLogger.error("MQ", "BlipMqProcessor", "Problem closing queue: " + e);
           }
        }
        // Disconnect queue manager
        if(qMgr != null) {
            try {
                qMgr.disconnect();
            } catch (final MQException e) {
                BlipModelLogger.error("MQ", "BlipMqProcessor", "Problem disconnecting from qMgr: " + e);
            }
        }
    }

    protected void createNewConnectionToMQ() throws MQException {
        try {
            MQEnvironment.hostname = Constants.MQ_HOST;
            MQEnvironment.channel = Constants.MQ_CHANNEL;
            MQEnvironment.port      = Integer.parseInt(Constants.MQ_PORT);
            if(Constants.MQ_SSL_CIPHER_SUITE != null) {
                MQEnvironment.sslCipherSuite = Constants.MQ_SSL_CIPHER_SUITE;
                MQEnvironment.sslPeerName = Constants.MQ_SSL_PEER;
            } else {
                MQEnvironment.sslCipherSuite = "";
                MQEnvironment.sslPeerName = "";
            }

            qMgr = new MQQueueManager(Constants.MQ_QMGR);
            int openOptions = MQC.MQOO_INPUT_AS_Q_DEF;
            queue = qMgr.accessQueue(Constants.MQ_IN_ACCESS_QUEUE, openOptions);
            gmo = new MQGetMessageOptions();
            gmo.options = MQC.MQGMO_WAIT | MQC.MQGMO_SYNCPOINT | MQC.MQGMO_FAIL_IF_QUIESCING;
            gmo.waitInterval = MQ_READ_TIMEOUT;
        } finally {
            MQEnvironment.sslCipherSuite = "";
            MQEnvironment.sslPeerName = "";
        }
    }

    protected void handleMqException(final MQException e) {
        long currentTime = System.currentTimeMillis();
        long timeBetweenMqErrors = currentTime - previousMqErrorTime;
        if (previousMqReasonCode != e.reasonCode || timeBetweenMqErrors > ONE_HOUR) {
            previousMqReasonCode = e.reasonCode;
            previousMqErrorTime = currentTime;
            BlipModelLogger.error("MQ", "BlipMqProcessor", "MQException reading from Access Queue: " + e);
        }
    }


}
4

1 回答 1

0

更改 readFromMQ 方法;

public void readFromMQ() throws BlipModelException {
    try {
        createNewConnectionToMQ();
        while(true) {
          try {
            MQMessage outputMessage = new MQMessage();
            queue.get(outputMessage,gmo);
            String blipModelXml = outputMessage.readLine();
            BlipLogs.logXML("BlipREQ", "0", blipModelXml);
            processMessage(blipModelXml);
            qMgr.commit();
          } catch (MQException e) {
            if (e.reasonCode != MQException.MQRC_NO_MSG_AVAILABLE) {
              throw e;
            }
          }
        }
    } catch (final MQException e) {
      handleMqException(e);
    } catch (final IOException e) {
        throw new BlipModelException("MQ", "Error reading MQ message.", "BlipMqProcessor.readFromMQ", e);
    } finally {
        cleanupMQResources();
    }
}

将是快速的解决方案;但不优雅。这里有很大的重构空间。

发生的事情是你实际上得到了一个 MQRC_NO_MSG_AVAILABLE 因为有趣的是,没有另一条消息可以检索(在某个时间点,而不是你想的时候);当您决定忽略该异常时,您已经退出了 while(true) 循环。

您不能使用queue.getCurrentQueueDepth(),因为它性能不高(显然),还因为它不适用于别名队列或集群上的队列。差不多就是这样;糟透了。

于 2012-06-29T19:31:40.247 回答