1

I am trying to understand the behaviour of Aggregator when using JDBCMessageStore. Below is my use case:

1) Read message(order details) from queue.

<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
  <orderItem>
    <isbn>12333454443</isbn>
    <quantity>4</quantity>
  </orderItem>
  <orderItem>
    <isbn>545656777</isbn>
    <quantity>50</quantity>
  </orderItem>
  ..
  ..
</order>

One order message will contain multiple orderItem.

2) Split the order message based on the count of orderItem

3) Aggregate into 4 orderItem message; messageCountReleaseStrategy is used with threshold = 4

4) Forward to aggregated message to service activator.

I was observing the data in 3 tables : INT_MESSAGE_GROUP, INT_MESSAGE and INT_GROUP_TO_MESSAGE and seems like data is initially inserted in these 3 tables and when the group is released , the date is removed/deleted from these tables.

I wanted to check DB insert failures in one of the tables (INT_GROUP_TO_MESSAGE). To simulate the error , I dropped the table INT_GROUP_TO_MESSAGE.

The below test was run:

1) Posted 1 order message with 3 orderItem

2) Got an error with message - Table does not exist - (Expected as INT_GROUP_TO_MESSAGE does not exist)

3) I could find records in 2 tables

a) INT_MESSAGE_GROUP had 1 record.

b) INT_MESSAGE had 23 records. Seems like as the message was retried multiple times, multiple entries was done.

4) Created the table INT_GROUP_TO_MESSAGE

5) Posted 1 order message with 1 orderItem

6) The original message(step 1) alongwith the new one (Step 5) was picked up from queue and processed.

7) Aggregator released a group a 4 messages.

8) However in database table - INT_MESSAGE there still exists 23 records, whereas other tables are empty.

I simulated the Db error my dropping the table , but in production we may encounter memory, tablespace issues that can cause these inserts to fail.

My question is can we set any parameter so that we have transaction in 3 tables : INT_MESSAGE_GROUP, INT_MESSAGE and INT_GROUP_TO_MESSAGE?

Below is my configuration

<int-jms:message-driven-channel-adapter id="jmsIn"
                                      channel="mqInbound"
                                      destination="requestQueue" 
                                      message-converter="orderMessageConverter"/>


<int:splitter input-channel="mqInbound"  output-channel="item" expression="payload.orderItem"/>


<int:aggregator input-channel="item"                 output-channel="itemList"
              ref="orderAggregator"                  method="sendList"
              correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders"
              expire-groups-upon-completion="true"   release-strategy="messageCountReleaseStrategy" 
              message-store="messageStore"           discard-channel="aggregatorDiscardChannel" />


<int-jdbc:message-store id="messageStore" data-source="jdbcDatasource"  table-prefix="MY_INT_"/>

  <bean id="messageCountReleaseStrategy"     class="org.springframework.integration.aggregator.MessageCountReleaseStrategy">
    <constructor-arg index="0" value="4"/>
  </bean>
4

0 回答 0