I have a Spring integration flow defined like this:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.acknowledgeMode(MANUAL)
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
And a service activator defined like this:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(List<> Collection<MyEvent> events) {
....
}
}
What I'm trying to do is to change the myMethod
to also take a list of particular headers associated with each message in the aggregation. In my case I'd like to get a hold of the AmqpHeaders.CHANNEL
and AmqpHeaders.DELIVERY_TAG
for each message so that I can perform an ACK or NACK for each message to RabbitMQ (note that I'm deliberately using manual acknowledgement mode in the IntegrationFlow since I want to send the ACK's/NACK's after myMethod
has executed).
I've tried for example this approach:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels,
@Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags,
Collection<MyEvent> events) {
....
}
}
but here I only seem to get header values for the last message (i.e. channels
and tags
are always of size 1 even though there are multiple events in the events
collection).
I've also tried changing Collection<MyEvent>
to Collection<Message>
(org.springframework.messaging.Message
) in order to try to manually extract the headers but this fails with:
org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message
since the message has already been transformed by the message converter defined in the IntegrationFlow.
How can I achieve this?