External module sends thousands of messages to the message broker. Each message has a TimeToLive property equal to 5 secs. Another module should consume and process ALL the messages.
From Spring Integration documentation I found that Staged Event-driven architecture (consumers) respond better to significant spikes in the load.
My current implementation uses EDA (even Driven Architecture), e.g.
<si:channel id="inputChannel"/>
<!-- get messages from PRESENCE_ENGINE queue -->
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
channel="inputChannel" destination="sso" connection-factory="connectionFactory"
max-concurrent-consumers="1" auto-startup="true" acknowledge="transacted" extract-payload="true"/>
<si:service-activator id ="activatorClient" input-channel="inputChannel" ref="messageService" method="processMessage"/>
<bean id="messageService" class="com.my.messaging.MessageService"/>
<bean id="sso"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SSO" />
</bean>
Obviously by heavy load,e.g. incoming thousands of messages, processMessage() can take longer than 5 secs. and the MessageService may not handle all the messages.
My ideas are following:
Modify processMessage() so that the message instead of being processed is only stored in MongoDB. Then I could process the messages in a separate task independently. In such a scenario MongoDB would serve as a CACHE.
Use a large number of consumers (SEDA model). The inputChannel is a direct channel.
- Process the messages asynchronously, e.g. inputChannel is a queue channel and the messages are processed asynchronously.
Before making the decision I would like to ask you which scenario is more effective. Perhaps Scenarios 2) and 3) provides a mechanism for meeting my requirement that ALL messages should be processed, even by heavy loads.
EDIT:
I already implemented scenario 2 where I keep sending 1000 messages per second. This is the statistics how many messages were missing with varying parameters:
max-concurrent-consumers ; TimeToLive=5secs.; Idle-consumer-limit; # of sent messages; # of received messages
10 ; Yes ; 1 ; 1001 ; 297
100 ; Yes ; 1 ; 1001 ; 861
150 ; Yes ; 1 ; 1001 ; 859
300 ; Yes ; 1 ; 1001 ; 861
300 ; No ; 1 ; 1001 ; 860
300 ; No ; 100 ; 1001 ; 1014
300 ; No ; 50 ; 1001 ; 1011
It seems idle-consumer-limit creates consumers more aggresively than max-concurrent consumers. Is this is a good approach to use idle-consumer-limit in such a scenario?
This is my config files for sender/consumer:
<!-- SENDER
Keep Alive Sender sends messages to backup server -->
<si:channel id="sendToChannel"/>
<si:channel id="presChannel"/>
<si:inbound-channel-adapter id="senderEntity" channel="sendToChannel" method="sendMessage">
<bean class="com.ucware.ucpo.sso.cache.CacheSender"/>
<si:poller fixed-rate="${sender.sendinterval}"></si:poller>
</si:inbound-channel-adapter>
<si:router id="messageRouter" method="routeMessage" input-channel="sendToChannel">
<bean class="com.ucware.ucpo.sso.messaging.MessageRouter"/>
</si:router>
<!-- Subscriber to a channel dispatcher, Send messages to JMS -->
<int-jms:outbound-channel-adapter explicit-qos-enabled="${jms.qos.enabled}" time-to-live="${jms.message.lifetime}"
channel="presChannel" connection-factory="connectionFactory" destination="pres" extract-payload="false"/>
<bean id="pres"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="PRES" />
</bean>
<!-- RECEIVER -->
<si:channel id="receiveChannel"/>
<!-- get messages from PRES queue -->
<int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
channel="receiveChannel" destination="presence" connection-factory="connectionFactory" idle-consumer-limit="50"
max-concurrent-consumers="300" auto-startup="true" acknowledge="transacted" extract-payload="true"/>
<si:service-activator id ="activatorClient" input-channel="receiveChannel" ref="messageService" method="processMessage"/>
<bean id="messageService" class="com.cache.MessageService"/>