4

我正在支持测试Spring-AMQPSpring-Integration我已经进行了配置和测试:

<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
    <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>

<int-amqp:inbound-channel-adapter 
    channel="consumingChannel"
    queue-names="durableQ" 
    connection-factory="connectionFactory"
    concurrent-consumers="1"
    acknowledge-mode="AUTO"
    />


public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");

    PollableChannel consumingChannel = context.getBean("consumingChannel",   
                                                          PollableChannel.class);           
        int count = 0;
        while (true) {
            Message<?> msg = consumingChannel.receive(1000);
            System.out.println((count++) + " \t -> " + msg);

            try { //sleep to check number of messages in queue
                Thread.sleep(50000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

在此配置中,很明显,一旦消息到达,consumingChannel它们就会被确认并因此从队列中删除。sleep我通过在后面放高receive并检查来验证这一点queue-size。没有进一步的控制。

现在,如果我设置acknowledge-mode=MANUAL,似乎没有办法ack通过弹簧集成进行手动操作。

我需要处理消息,并在处理后执行此操作manual-ack,直到ack消息保持在durableQ.

有什么方法可以处理MANUALackspring-amqp-integration吗?我想避免传递ChannelAwareMessageListener给,inbound-channel-adapter因为我想控制消费者的receive.

更新:

listener-container使用 own with时,这似乎是不可能的inbound-channel-adapter

// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> 

<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="durableQ" />
    <property name="acknowledgeMode" value="MANUAL" />

// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
    <property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>

messageListener由于不允许使用属性,上述配置会引发错误,请参阅标记的内联注释。所以使用的目的listner-container被打败了(暴露channelvia ChannelAwareMessageListener)。

对我来说spring-integration不能用于manual-acknowledgement(我知道,这是一个很难说的说法!),任何人都可以帮助我验证这一点,或者我是否缺少任何特定的方法/配置?

4

2 回答 2

3

问题是因为您使用QueueChannel. 通常最好控制容器中的并发性 ( concurrent-consumers="2") 并且不要在您的流程中执行任何异步切换 (使用DirectChannels)。这样,AUTO ack 就可以正常工作了。而不是从PollableChannel订阅 a接收new MessageHandler()到 a SubscribableChannel

更新:

您通常不需要在 SI 应用程序中处理消息,但相当于您使用 DirectChannel 进行的测试是......

    SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);

    channel.subscribe(new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("Got " + message);
        }
    });
于 2014-01-10T13:29:00.223 回答
0

MANUAL Ack 仅允许通过Channel.basicAck(). 因此,您应该可以访问Channel收到您的消息的 .

尝试玩advice-chainof <int-amqp:inbound-channel-adapter>

  1. 实施一些Advice作为MethodBeforeAdvice
  2. on advice-chainContainer 申请ContainerDelegate#invokeListener
  3. 该方法的第一个参数恰好是Channel
  4. 假设你可以把MessageProperties.headers那个Channel放在那个里面Advice
  5. <int-amqp:inbound-channel-adapter>mapped-request-headers对其进行配置Channel
  6. 最后尝试在下游流的任何位置从 Spring Integration Message调用basicAck()该标头。Channel
于 2014-01-10T13:20:23.980 回答