I am trying to understand the behaviour of Aggregator when using JDBCMessageStore. Below is my use case:
1) Read message(order details) from queue.
<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
<orderItem>
<isbn>12333454443</isbn>
<quantity>4</quantity>
</orderItem>
<orderItem>
<isbn>545656777</isbn>
<quantity>50</quantity>
</orderItem>
..
..
</order>
One order message will contain multiple orderItem.
2) Split the order message based on the count of orderItem
3) Aggregate into 4 orderItem message; messageCountReleaseStrategy is used with threshold = 4
4) Forward to aggregated message to service activator.
I was observing the data in 3 tables : INT_MESSAGE_GROUP, INT_MESSAGE and INT_GROUP_TO_MESSAGE and seems like data is initially inserted in these 3 tables and when the group is released , the date is removed/deleted from these tables.
I wanted to check DB insert failures in one of the tables (INT_GROUP_TO_MESSAGE). To simulate the error , I dropped the table INT_GROUP_TO_MESSAGE.
The below test was run:
1) Posted 1 order message with 3 orderItem
2) Got an error with message - Table does not exist - (Expected as INT_GROUP_TO_MESSAGE does not exist)
3) I could find records in 2 tables
a) INT_MESSAGE_GROUP had 1 record.
b) INT_MESSAGE had 23 records. Seems like as the message was retried multiple times, multiple entries was done.
4) Created the table INT_GROUP_TO_MESSAGE
5) Posted 1 order message with 1 orderItem
6) The original message(step 1) alongwith the new one (Step 5) was picked up from queue and processed.
7) Aggregator released a group a 4 messages.
8) However in database table - INT_MESSAGE there still exists 23 records, whereas other tables are empty.
I simulated the Db error my dropping the table , but in production we may encounter memory, tablespace issues that can cause these inserts to fail.
My question is can we set any parameter so that we have transaction in 3 tables : INT_MESSAGE_GROUP, INT_MESSAGE and INT_GROUP_TO_MESSAGE?
Below is my configuration
<int-jms:message-driven-channel-adapter id="jmsIn"
channel="mqInbound"
destination="requestQueue"
message-converter="orderMessageConverter"/>
<int:splitter input-channel="mqInbound" output-channel="item" expression="payload.orderItem"/>
<int:aggregator input-channel="item" output-channel="itemList"
ref="orderAggregator" method="sendList"
correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders"
expire-groups-upon-completion="true" release-strategy="messageCountReleaseStrategy"
message-store="messageStore" discard-channel="aggregatorDiscardChannel" />
<int-jdbc:message-store id="messageStore" data-source="jdbcDatasource" table-prefix="MY_INT_"/>
<bean id="messageCountReleaseStrategy" class="org.springframework.integration.aggregator.MessageCountReleaseStrategy">
<constructor-arg index="0" value="4"/>
</bean>