1

I have a Spring integration flow defined like this:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .acknowledgeMode(MANUAL)
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

And a service activator defined like this:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator    
    public void myMethod(List<> Collection<MyEvent> events) {
        ....
    }    
}

What I'm trying to do is to change the myMethod to also take a list of particular headers associated with each message in the aggregation. In my case I'd like to get a hold of the AmqpHeaders.CHANNEL and AmqpHeaders.DELIVERY_TAG for each message so that I can perform an ACK or NACK for each message to RabbitMQ (note that I'm deliberately using manual acknowledgement mode in the IntegrationFlow since I want to send the ACK's/NACK's after myMethod has executed).

I've tried for example this approach:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator    
    public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels, 
                         @Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags, 
                         Collection<MyEvent> events) {
        ....
    }    
}

but here I only seem to get header values for the last message (i.e. channels and tags are always of size 1 even though there are multiple events in the events collection).

I've also tried changing Collection<MyEvent> to Collection<Message> (org.springframework.messaging.Message) in order to try to manually extract the headers but this fails with:

org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message

since the message has already been transformed by the message converter defined in the IntegrationFlow.

How can I achieve this?

4

1 回答 1

1

You need a custom outputProcessor on your aggregator to return the message you desire (with lists of channels/delivery tags in headers and list of events in the payload).

于 2017-11-27T14:01:19.977 回答