0

So, I'm trying to route invoices based on a field in the database using the Choice Flow Control. This works fine without using a collection aggregator, however I want to use the aggregator to group the transactions into a single email. However, when I add the collection aggregators the routing gets all mixed up. For example, in my last test run an invoice with LHF_INVOICE_METHOD='INACTV' and IS_ZERO_OR_NEGATIVE='1' got routed to "Trucking" (it routes correctly to "Inactive" without collection aggregation). I think the collection aggregators are causing the routes to get crossed with each other. I setup my MULE_CORRELATION_ID to include LHF_INVOICE_METHOD and IS_ZERO_OR_NEGATIVE so the collection aggregators have their own correlation IDs.

<flow name="InvoiceWorkflow" doc:name="InvoiceWorkflow" processingStrategy= "synchronous">
    <jdbc:inbound-endpoint queryKey="GetUnprocessedInvoices" queryTimeout="10000" pollingFrequency= "1000" connector-ref ="Database" doc:name="Get invoice run" >
        <jdbc:query key="GetUnprocessedInvoices" value="SELECT        COMPANY_CODE, DIVISION, INVOICE_NUMBER,&#13;&#10;LHF_INVOICE_METHOD, STATUS_FLAG,&#13;&#10;SYSTEM_DATE, USERNAME, RUN_NUMBER, ROWID, IS_ZERO_OR_NEGATIVE,&#13;&#10;LHF_INVOICE_METHOD || RUN_NUMBER || IS_ZERO_OR_NEGATIVE AS CORRELATION_ID,&#13;&#10;LHF_INVOICE_METHOD || RUN_NUMBER || IS_ZERO_OR_NEGATIVE || '2' AS CORRELATION_ID_2&#13;&#10;FROM             FIN.LHF_INVOICE_WORKFLOW &#13;&#10; WHERE         (STATUS_FLAG = 'N')"/>
    </jdbc:inbound-endpoint>
    <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="insert_invoice_run" queryTimeout= "10000" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name= "Load to custom_app_data">
        <jdbc:query key="insert_invoice_run" value="INSERT INTO lhf_ros_invoice_workflow ([COMPANY_CODE] ,[DIVISION] ,[INVOICE_NUMBER] ,[LHF_INVOICE_METHOD] ,[STATUS_FLAG] ,[SYSTEM_DATE] ,[USERNAME] ,[RUN_NUMBER] ,[IS_ZERO_OR_NEGATIVE]) VALUES (#[map-payload:COMPANY_CODE] ,#[map-payload:DIVISION] ,#[map-payload:INVOICE_NUMBER] ,#[map-payload:LHF_INVOICE_METHOD], #[map-payload:STATUS_FLAG] ,#[map-payload:SYSTEM_DATE] ,#[map-payload:USERNAME] ,#[map-payload:RUN_NUMBER] ,#[map-payload:IS_ZERO_OR_NEGATIVE])"/>
    </jdbc:outbound-endpoint>
    <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MarkAsProcessed" queryTimeout= "10000" connector-ref="DatabaseMuleLogin" doc:name= "Mark Processed in Ross" >
        <jdbc:query key="MarkAsProcessed" value="UPDATE       FIN.LHF_INVOICE_WORKFLOW SET                 STATUS_FLAG = 'P' WHERE        (ROWID = #[map-payload:ROWID])"/>
    </jdbc:outbound-endpoint>
    <message-properties-transformer doc:name="Message Properties">
        <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="1000"/>
        <add-message-property key="MULE_CORRELATION_ID" value="#[message.payload.CORRELATION_ID]"/>
    </message-properties-transformer>
    <choice doc:name="Choice" >
        <when expression="#[message.payload.LHF_INVOICE_METHOD == 'INACTV']">
            <processor-chain>
                <collection-aggregator failOnTimeout="false" doc:name="Inactive" timeout="2000"/>
                <smtp:outbound-endpoint  responseTimeout="10000" doc:name="Inactive" from="muleservice@example.com"  subject="[Invoice Workflow] Inactive 1" to="ben@example.com" host="mail.example.com"/>
                <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
            </processor-chain>
        </when>
        <when expression="#[message.payload.IS_ZERO_OR_NEGATIVE=='1']">
            <processor-chain>
                <collection-aggregator timeout="3000" failOnTimeout="false" doc:name="Zero"/>
                <smtp:outbound-endpoint to= "ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Zero Dollar " responseTimeout= "10000"  doc:name="Zero Dollar" host="mail.example.com"/>
                <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
            </processor-chain>
        </when>
        <when expression="#[message.payload.LHF_INVOICE_METHOD=='TRUCK']">
            <processor-chain>
                <collection-aggregator timeout="4000" failOnTimeout="false" doc:name="Truck"/>
                <smtp:outbound-endpoint to= "ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Trucking" responseTimeout= "10000"  doc:name="Trucking" host="mail.example.com"/>
                <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
            </processor-chain>
        </when>
        <otherwise>
            <processor-chain>
                <smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] RSForms Email" responseTimeout="10000" doc:name="RSForms"/>
                <message-properties-transformer overwrite="true" doc:name="Message Properties">
                    <add-message-property key="MULE_CORRELATION_ID" value="#[message.payload.CORRELATION_ID_2]"/>
                </message-properties-transformer>
                <choice doc:name="Choice">
                    <when expression="#[message.payload.LHF_INVOICE_METHOD == 'SPEC']">
                        <processor-chain>
                            <collection-aggregator timeout="5000" failOnTimeout="false" doc:name="Special"/>
                            <smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Special Handling" responseTimeout="10000" doc:name="Special"/>
                            <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
                        </processor-chain>
                    </when>
                    <when expression="#[message.payload.LHF_INVOICE_METHOD == 'EMAIL']">
                        <processor-chain>
                            <collection-aggregator timeout="6000" failOnTimeout="false" doc:name="Do Nothing"/>
                            <smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Do nothing" responseTimeout="10000" doc:name="Do Nothing"/>
                            <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
                        </processor-chain>
                    </when>
                    <when expression="#[message.payload.LHF_INVOICE_METHOD == 'MAILUS']">
                        <processor-chain>
                            <collection-aggregator timeout="7000" failOnTimeout="false" doc:name="US Mail"/>
                            <smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] US Mail" responseTimeout="10000" doc:name="US Mail"/>
                            <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
                        </processor-chain>
                    </when>
                    <when expression="#[message.payload.LHF_INVOICE_METHOD == 'MAILCN']">
                        <processor-chain>
                            <collection-aggregator timeout="8000" failOnTimeout="false" doc:name="Intl Mail"/>
                            <smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Intl Mail" responseTimeout="10000" doc:name="Intl Mail"/>
                            <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
                        </processor-chain>
                    </when>
                    <when expression="#[message.payload.LHF_INVOICE_METHOD == 'EDI']">
                        <processor-chain>
                            <collection-aggregator timeout="9000" failOnTimeout="false" doc:name="EDI"/>
                            <smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] EDI" responseTimeout="10000" doc:name="EDI"/>
                            <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
                        </processor-chain>
                    </when>
                    <otherwise>
                        <processor-chain>
                            <collection-aggregator timeout="10000" failOnTimeout="false" doc:name="Exceptions"/>
                            <smtp:outbound-endpoint host="mail.example.com" to="ben@example.com" from="muleservice@example.com" subject="[Invoice Workflow] Exceptions" responseTimeout="10000" doc:name="Exceptions"/>
                            <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
                        </processor-chain>
                    </otherwise>
                </choice>
            </processor-chain>
        </otherwise>
    </choice>
</flow >

I added loggers after each WHEN branch before the aggregators and the transactions go to the proper branch before aggregation. Here is actual data, as far as I can tell the Mule Correlation IDs are unique to each aggregator. Here is some actual data sent through with what happened and what I expected:

BEFORE Aggregation (all correct)

Logger Inactive

{INVOICE_NUMBER=408908, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@5a72f4f7, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518101, CORRELATION_ID_2=INACTV486265181012, IS_ZERO_OR_NEGATIVE=1}

{INVOICE_NUMBER=406749, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@c0a628c, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518101, CORRELATION_ID_2=INACTV486265181012, IS_ZERO_OR_NEGATIVE=1}

{INVOICE_NUMBER=408691, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@42652ecc, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518100, CORRELATION_ID_2=INACTV486265181002, IS_ZERO_OR_NEGATIVE=0}

Logger for Zero or Negative

{INVOICE_NUMBER=409061, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=MAILUS, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@7933b36f, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=MAILUS48626518101, CORRELATION_ID_2=MAILUS486265181012, IS_ZERO_OR_NEGATIVE=1}

{INVOICE_NUMBER=410968, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=EDI, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@7a9b36be, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=EDI48626518101, CORRELATION_ID_2=EDI486265181012, IS_ZERO_OR_NEGATIVE=1}

AFTER Aggregation

Inactive 1: (these two went to correct route)

[{INVOICE_NUMBER=408908, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@5a72f4f7, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518101, CORRELATION_ID_2=INACTV486265181012, IS_ZERO_OR_NEGATIVE=1},

{INVOICE_NUMBER=406749, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@c0a628c, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518101, CORRELATION_ID_2=INACTV486265181012, IS_ZERO_OR_NEGATIVE=1}]

Zero Dollar: (this one is correct)

[{INVOICE_NUMBER=409061, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=MAILUS, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@7933b36f, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=MAILUS48626518101, CORRELATION_ID_2=MAILUS486265181012, IS_ZERO_OR_NEGATIVE=1}]

Trucking: (this should have gone to Zero Dollar)

[{INVOICE_NUMBER=410968, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=EDI, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@7a9b36be, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=EDI48626518101, CORRELATION_ID_2=EDI486265181012, IS_ZERO_OR_NEGATIVE=1}]

Special Handling: (should have gone to "Inactive 1")

[{INVOICE_NUMBER=408691, SYSTEM_DATE=2012-12-19 16:36:50.0, DIVISION=1, LHF_INVOICE_METHOD=INACTV, RUN_NUMBER=4862651810, ROWID=oracle.sql.ROWID@42652ecc, COMPANY_CODE=1, STATUS_FLAG=N, USERNAME=BBRYAN, CORRELATION_ID=INACTV48626518100, CORRELATION_ID_2=INACTV486265181002, IS_ZERO_OR_NEGATIVE=0}]

4

1 回答 1

2

Routing doesn't get affected but instead the message hits the right collection-aggregator but gets aggregated in the wrong group, thus ends delivered by another collection-aggregator when it times out.

This is because all the aggregators use the default shared message store which makes Mule completely mix messages together. This default behaviour is wrong IMO but to workaround the issue configure a unique storePrefix on each aggregator.

For example:

<when expression="#[message.payload.LHF_INVOICE_METHOD == 'INACTV']">
    <processor-chain>
        <collection-aggregator storePrefix="inactive" failOnTimeout="false" doc:name="Inactive" timeout="2000"/>
        <smtp:outbound-endpoint  responseTimeout="10000" doc:name="Inactive" from="muleservice@example.com"  subject="[Invoice Workflow] Inactive 1" to="ben@example.com" host="mail.example.com"/>
        <jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="MARK_COMPLETE" queryTimeout="-1" connector-ref="SPTSQL01_APPS_custom_app_data" doc:name="Mark Complete"/>
    </processor-chain>
</when>

and so on and so forth.

于 2012-12-19T23:54:05.267 回答