0

如果队列中的所有消息都已处理,我需要停止 ActiveMQ 连接器。这需要在 Mule 流程中完成。

如下所示,我有两个连接器,一个用于读取,另一个用于写入 vci.staging.queue。我想检查队列中是否处理了所有消息,然后禁用阅读器连接器。

下面的脚本使用来自 muleContext 的 client.request 使用队列名称和读取器或写入器连接器总是为我返回“null”。

有什么方法可以获取队列中待处理消息的数量或检查是否所有消息都已处理,以便可以禁用连接器?

<jms:activemq-connector name="jmsConnectorStagingQReaderNormal"
                            brokerURL="${mule.activemq.broker.read.normal.url}" 
                            specification="1.1"
                            maxRedelivery="-1"
                            persistentDelivery="true"
                            numberOfConcurrentTransactedReceivers="${mule.activemq.concurrent.receivers}"
                            connectionFactory-ref="connectionFactory"
                            disableTemporaryReplyToDestinations="false">
        </jms:activemq-connector>

<jms:activemq-connector name="jmsConnectorStagingQWriter"
                            brokerURL="${mule.activemq.broker.write.url}" 
                            specification="1.1"
                            maxRedelivery="-1"
                            persistentDelivery="true"
                            numberOfConcurrentTransactedReceivers="${mule.activemq.concurrent.receivers}"
                            connectionFactory-ref="connectionFactory"
                            disableTemporaryReplyToDestinations="false">
        </jms:activemq-connector>


<script:component>
<script:script engine="groovy">
        if(muleContext.getRegistry().lookupConnector('jmsConnectorStagingQReaderNormal').isStarted()) {
            if(muleContext.client.request("jms://vci.staging.queue?connector= jmsConnectorStagingQReaderNormal ", 5000) == null) {
                muleContext.getRegistry().lookupConnector('jmsConnectorStagingQReaderNormal').stop()
             }
        }                               
        return payload
        </script:script>
</script:component>
4

1 回答 1

1

使用QueueBrowser可以查看 JMS 队列而不使用其消息。

为了这:

  • 创建一个自定义组件,
  • jms:activemq-connector让 Spring在组件中注入你的,
  • 调用getSession(false, false)它以获取活动的 JMS Session
  • 调用createBrowser(Queue queue)Session您可以Queue使用Session.createQueue(..).
于 2013-01-30T18:28:01.720 回答