So, I'm trying to route invoices based on a field in the database using the Choice Flow Control. This works fine without using a collection aggregator, however I want to use the aggregator to group the transactions into a single email. However, when I add the collection aggregators the routing gets all mixed up. For example, in my last test run an invoice with LHF_INVOICE_METHOD='INACTV' and IS_ZERO_OR_NEGATIVE='1' got routed to "Trucking" (it routes correctly to "Inactive" without collection aggregation). I think the collection aggregators are causing the routes to get crossed with each other. I setup my MULE_CORRELATION_ID to include LHF_INVOICE_METHOD and IS_ZERO_OR_NEGATIVE so the collection aggregators have their own correlation IDs.
<flow name="InvoiceWorkflow" doc:name="InvoiceWorkflow" processingStrategy= "synchronous">
<jdbc:inbound-endpoint queryKey="GetUnprocessedInvoices" queryTimeout="10000" pollingFrequency= "1000" connector-ref ="Database" doc:name="Get invoice run" >
<jdbc:query key="GetUnprocessedInvoices" value="SELECT COMPANY_CODE, DIVISION, INVOICE_NUMBER, LHF_INVOICE_METHOD, STATUS_FLAG, SYSTEM_DATE, USERNAME, RUN_NUMBER, ROWID, IS_ZERO_OR_NEGATIVE, LHF_INVOICE_METHOD || RUN_NUMBER || IS_ZERO_OR_NEGATIVE AS CORRELATION_ID, LHF_INVOICE_METHOD || RUN_NUMBER || IS_ZERO_OR_NEGATIVE || '2' AS CORRELATION_ID_2 FROM FIN.LHF_INVOICE_WORKFLOW WHERE (STATUS_FLAG = 'N')"/>
</jdbc:inbound-endpoint>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="insert_invoice_run" queryTimeout= "10000" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name= "Load to custom_app_data">
<jdbc:query key="insert_invoice_run" value="INSERT INTO lhf_ros_invoice_workflow ([COMPANY_CODE] ,[DIVISION] ,[INVOICE_NUMBER] ,[LHF_INVOICE_METHOD] ,[STATUS_FLAG] ,[SYSTEM_DATE] ,[USERNAME] ,[RUN_NUMBER] ,[IS_ZERO_OR_NEGATIVE]) VALUES (#[map-payload:COMPANY_CODE] ,#[map-payload:DIVISION] ,#[map-payload:INVOICE_NUMBER] ,#[map-payload:LHF_INVOICE_METHOD], #[map-payload:STATUS_FLAG] ,#[map-payload:SYSTEM_DATE] ,#[map-payload:USERNAME] ,#[map-payload:RUN_NUMBER] ,#[map-payload:IS_ZERO_OR_NEGATIVE])"/>
</jdbc:outbound-endpoint>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MarkAsProcessed" queryTimeout= "10000" connector-ref="DatabaseMuleLogin" doc:name= "Mark Processed in Ross" >
<jdbc:query key="MarkAsProcessed" value="UPDATE FIN.LHF_INVOICE_WORKFLOW SET STATUS_FLAG = 'P' WHERE (ROWID = #[map-payload:ROWID])"/>
</jdbc:outbound-endpoint>
<message-properties-transformer doc:name="Message Properties">
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="1000"/>
<add-message-property key="MULE_CORRELATION_ID" value="#[message.payload.CORRELATION_ID]"/>
</message-properties-transformer>
<choice doc:name="Choice" >
<when expression="#[message.payload.LHF_INVOICE_METHOD == 'INACTV']">
<processor-chain>
<collection-aggregator failOnTimeout="false" doc:name="Inactive" timeout="2000"/>
<smtp:outbound-endpoint responseTimeout="10000" doc:name="Inactive" from="muleservice@example.com" subject="[Invoice Workflow] Inactive 1" to="ben@example.com" host="mail.example.com"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
</processor-chain>
</when>
<when expression="#[message.payload.IS_ZERO_OR_NEGATIVE=='1']">
<processor-chain>
<collection-aggregator timeout="3000" failOnTimeout="false" doc:name="Zero"/>
<smtp:outbound-endpoint to= "ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Zero Dollar " responseTimeout= "10000" doc:name="Zero Dollar" host="mail.example.com"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
</processor-chain>
</when>
<when expression="#[message.payload.LHF_INVOICE_METHOD=='TRUCK']">
<processor-chain>
<collection-aggregator timeout="4000" failOnTimeout="false" doc:name="Truck"/>
<smtp:outbound-endpoint to= "ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Trucking" responseTimeout= "10000" doc:name="Trucking" host="mail.example.com"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
</processor-chain>
</when>
<otherwise>
<processor-chain>
<smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] RSForms Email" responseTimeout="10000" doc:name="RSForms"/>
<message-properties-transformer overwrite="true" doc:name="Message Properties">
<add-message-property key="MULE_CORRELATION_ID" value="#[message.payload.CORRELATION_ID_2]"/>
</message-properties-transformer>
<choice doc:name="Choice">
<when expression="#[message.payload.LHF_INVOICE_METHOD == 'SPEC']">
<processor-chain>
<collection-aggregator timeout="5000" failOnTimeout="false" doc:name="Special"/>
<smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Special Handling" responseTimeout="10000" doc:name="Special"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
</processor-chain>
</when>
<when expression="#[message.payload.LHF_INVOICE_METHOD == 'EMAIL']">
<processor-chain>
<collection-aggregator timeout="6000" failOnTimeout="false" doc:name="Do Nothing"/>
<smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Do nothing" responseTimeout="10000" doc:name="Do Nothing"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
</processor-chain>
</when>
<when expression="#[message.payload.LHF_INVOICE_METHOD == 'MAILUS']">
<processor-chain>
<collection-aggregator timeout="7000" failOnTimeout="false" doc:name="US Mail"/>
<smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] US Mail" responseTimeout="10000" doc:name="US Mail"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
</processor-chain>
</when>
<when expression="#[message.payload.LHF_INVOICE_METHOD == 'MAILCN']">
<processor-chain>
<collection-aggregator timeout="8000" failOnTimeout="false" doc:name="Intl Mail"/>
<smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Intl Mail" responseTimeout="10000" doc:name="Intl Mail"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
</processor-chain>
</when>
<when expression="#[message.payload.LHF_INVOICE_METHOD == 'EDI']">
<processor-chain>
<collection-aggregator timeout="9000" failOnTimeout="false" doc:name="EDI"/>
<smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] EDI" responseTimeout="10000" doc:name="EDI"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
</processor-chain>
</when>
<otherwise>
<processor-chain>
<collection-aggregator timeout="10000" failOnTimeout="false" doc:name="Exceptions"/>
<smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Exceptions" responseTimeout="10000" doc:name="Exceptions"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
</processor-chain>
</otherwise>
</choice>
</processor-chain>
</otherwise>
</choice>
</flow >
I added loggers after each WHEN branch before the aggregators and the transactions go to the proper branch before aggregation. Here is actual data, as far as I can tell the Mule Correlation IDs are unique to each aggregator. Here is some actual data sent through with what happened and what I expected:
BEFORE Aggregation (all correct)
Logger Inactive
{INVOICE_NUMBER=408908, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@5a72f4f7, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518101, CORRELATION_ID_2=INACTV486265181012, IS_ZERO_OR_NEGATIVE=1}
{INVOICE_NUMBER=406749, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@c0a628c, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518101, CORRELATION_ID_2=INACTV486265181012, IS_ZERO_OR_NEGATIVE=1}
{INVOICE_NUMBER=408691, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@42652ecc, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518100, CORRELATION_ID_2=INACTV486265181002, IS_ZERO_OR_NEGATIVE=0}
Logger for Zero or Negative
{INVOICE_NUMBER=409061, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=MAILUS, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@7933b36f, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=MAILUS48626518101, CORRELATION_ID_2=MAILUS486265181012, IS_ZERO_OR_NEGATIVE=1}
{INVOICE_NUMBER=410968, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=EDI, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@7a9b36be, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=EDI48626518101, CORRELATION_ID_2=EDI486265181012, IS_ZERO_OR_NEGATIVE=1}
AFTER Aggregation
Inactive 1: (these two went to correct route)
[{INVOICE_NUMBER=408908, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@5a72f4f7, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518101, CORRELATION_ID_2=INACTV486265181012, IS_ZERO_OR_NEGATIVE=1},
{INVOICE_NUMBER=406749, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@c0a628c, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518101, CORRELATION_ID_2=INACTV486265181012, IS_ZERO_OR_NEGATIVE=1}]
Zero Dollar: (this one is correct)
[{INVOICE_NUMBER=409061, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=MAILUS, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@7933b36f, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=MAILUS48626518101, CORRELATION_ID_2=MAILUS486265181012, IS_ZERO_OR_NEGATIVE=1}]
Trucking: (this should have gone to Zero Dollar)
[{INVOICE_NUMBER=410968, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=EDI, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@7a9b36be, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=EDI48626518101, CORRELATION_ID_2=EDI486265181012, IS_ZERO_OR_NEGATIVE=1}]
Special Handling: (should have gone to "Inactive 1")
[{INVOICE_NUMBER=408691, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@42652ecc, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518100, CORRELATION_ID_2=INACTV486265181002, IS_ZERO_OR_NEGATIVE=0}]