2

我是一个新手,但我会尽量保持清醒。

{INPUT QUEUE}->[INBOUND-GATEWAY-1]-->[ROUTER]----------->(激活器)<---------------
                                        \ /
                                         \-->{HOLD QUEUE}--->[INBOUND-GATEWAY-2]--^

我有一个场景,我必须像前者一样动态更改流中的路由条件。来自队列的消息被发送到一个激活器进行处理,或者另一个队列被搁置。在某些时候,我必须关闭 INBOUND-GATEWAY-1 以便没有新消息进入流程,并打开 INBOUND-GATEWAY-2 以处理来自 HOLD QUEUE 的所有消息。一旦来自 HOLD QUEUE 的所有消息都被消费完,两个网关都必须像以前一样关闭/打开。这里的问题是我怎么知道 HOLD QUEUE 何时为空,以便我可以触发可以启动 gateway-1 的方法?

如果有人可以帮助我,我将不胜感激。

提前致谢

4

2 回答 2

1

经过一番调试和阅读,我终于找到了这个问题的解决方案。入站网关是一个 JmsMessageDrivenEndpoint,它基于两个内部组件,一个 MessageListenerContainer 和一个 MessageListener。MessageListenerContainer 是负责调度 MessageListener 行为的人,因此,覆盖 noMessageReceived 和 messageReceived,并添加一些属性来控制所需的行为,我可以做到“魔术”。

我的 MessageListenerContainer 实现是这样的。

public class ControlMessageListenerContainer extends DefaultMessageListenerContainer{

    private JmsMessageDrivenEndpoint mainInputGateway;

    private long timeOut;

    private long lastTimeReceived;  

    public PassControlMessageListenerContainer() {
        this.setAutoStartup(false);
    }

    @Override
    public void start() throws JmsException {
        /*When the container is started the lastTimeReceived is set to actial time*/
        lastTimeReceived = (new Date()).getTime();
        super.start();
    }

    @Override
    protected void noMessageReceived(Object invoker, Session session) {
        long actualTime = (new Date()).getTime();

        if((actualTime - lastTimeReceived) >= timeOut 
                && mainInputGateway != null && !mainInputGateway.isRunning()){
            mainInputGateway.start();
        }       
        super.noMessageReceived(invoker, session);
    }

    @Override
    protected void messageReceived(Object invoker, Session session) {
        /*lastTimeReceived is set again to actual time at new message arrive*/
        lastTimeReceived = (new Date()).getTime();
        super.messageReceived(invoker, session);
    }
}

最后,spring bean 配置如下:

<bean id="listenerContainer" 
    class="org.merol.ControlMessageListenerContainer">
    <property name="mainInputGateway" ref="mainGateway" />
    <property name="destination" ref="onHoldQueue" />
    <property name="timeOut" value="10000"/>
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

<bean id="messageListener" 
    class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener">
    <property name="requestChannel" ref="outputChannel" />
</bean>

<bean id="inboundGateway" 
    class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
    <constructor-arg name="listenerContainer" ref="listenerContainer" />
    <constructor-arg name="listener" ref="messageListener" />
</bean>

希望这对其他人有帮助。

感谢@Nicholas 提供线索。

于 2011-05-24T18:17:31.100 回答
0

我会将此功能放入入站网关处理器中。例如:

网关1处理器:

  • start():从主队列和进程中启动消费者。
  • stop():停止消费者。

网关2处理器:

  • start():从 HOLD 队列中启动消费者。指定适当的超时。当超时触发时,(HOLD 队列为空)调用stop()
  • stop():启动Gateway1Processor并停止此消费者。

因此,操作顺序将是:

  1. 启动Gateway1Processor
  2. 某个时间,调用Gateway1Processor.stop()Gateway2Processor.start()
  3. Gateway2Processor将排空 HOLD 队列,重新启动Gateway1Processor然后停止。
  4. 转到#2。
于 2011-05-17T13:29:04.873 回答