I need to use the timeout value passed in the message header as the value for the collection aggregator.
<collection-aggregator timeout="#[new Integer(header:SESSION:TIMEOUT)]" failOnTimeout="false" doc:name="Collection Aggregator"/>
I tried to convert it to a number but it wouldn't take it. Can someone post an example of how I would do it?
Mule Config
<flow name="workIn">
<http:inbound-endpoint exchange-pattern="request-response" address="http://localhost:5455/SQSearcher" doc:name="HTTP">
<cxf:jaxws-service serviceClass="com.ace.st.tf.web.ws.sq.ISQSearcher" doc:name="SQ Searcher"/>
<message-properties-transformer scope="session">
<add-message-property value="1" key="DOBSearch" />
</message-properties-transformer>
</http:inbound-endpoint>
<logger message="SQSearcher Inbound Payload-> #[payload]" level="INFO" doc:name="Logger"/>
<message-properties-transformer scope="session">
<add-message-property value="#[new Integer(payload.getTimeout())]" key="TIMEOUT" />
</message-properties-transformer>
<custom-transformer class="com.ace.st.tf.web.ws.sq.transformer.CCDMessageTransformer" doc:name="Java"/>
<request-reply storePrefix="workStore" doc:name="Put message for processing">
<vm:outbound-endpoint path="dispatchIn">
<message-properties-transformer scope="outbound">
<delete-message-property key="MULE_REPLYTO" />
<add-message-property value="#[message:id]" key="MULE_CORRELATION_ID" />
</message-properties-transformer>
</vm:outbound-endpoint>
<vm:inbound-endpoint path="dispatchOut" />
</request-reply>
<custom-transformer class="com.ace.st.tf.web.ws.sq.transformer.CcdSearchResultTransformer" doc:name="Java"/>
</flow>
<flow name="workDispatcher">
<vm:inbound-endpoint path="dispatchIn" />
<flow-ref name="workWorker" />
</flow>
<flow name="workWorker">
<message-properties-transformer scope="outbound" doc:name="Remember correlation">
<add-message-property value="#[header:OUTBOUND:MULE_CORRELATION_ID]" key="MULE_CORRELATION_ID" />
<add-message-property value="2" key="MULE_CORRELATION_GROUP_SIZE" />
</message-properties-transformer>
<all>
<vm:outbound-endpoint path="vm.flow.1"/>
<vm:outbound-endpoint path="vm.flow.2"/>
</all>
</flow>
<flow name="workAggregator">
<vm:inbound-endpoint path="vm.aggregate" />
<logger message="payload to aggregate-> #[payload]" level="INFO" doc:name="Logger"/>
<custom-aggregator class="com.ace.st.tf.web.ws.sq.aggregator.CustomAggregator" storePrefix="header:SESSION:TIMEOUT" failOnTimeout="false"/>
<logger message="Aggregated Payload-> #[payload]" level="INFO" doc:name="Logger"/>
<vm:outbound-endpoint path="dispatchOut" />
</flow>
<flow name="CCDClient" doc:name="CCDClient">
<vm:inbound-endpoint path="vm.flow.1" doc:name="VM"/>
<logger message="CCDClient correlation id inbound-> #[header:OUTBOUND:MULE_CORRELATION_GROUP_SIZE]" level="INFO" doc:name="Logger"/>
<choice doc:name="Choice">
<when expression="header:SESSION:DOBSearch=1">
<outbound-endpoint address="http://localhost:4723/CcdSearcher" doc:name="Generic">
<cxf:jaxws-client clientClass="com.ace.st.tf.web.ws.sq.ccd.client.CcdSearcher_Service" port="CcdSearcherPort" wsdlLocation="classpath:CcdSearcher.wsdl" operation="searchByNameDob"/>
</outbound-endpoint>
</when>
<otherwise>
<expression-component doc:name="set error message">payload="NOT A DOB SEARCH"</expression-component>
</otherwise>
</choice>
<vm:outbound-endpoint path="vm.aggregate" doc:name="VM">
<message-properties-transformer scope="outbound">
<add-message-property value="#[header:INBOUND:MULE_CORRELATION_ID]" key="MULE_CORRELATION_ID" />
<add-message-property value="#[header:INBOUND:MULE_CORRELATION_GROUP_SIZE]" key="MULE_CORRELATION_GROUP_SIZE" />
</message-properties-transformer>
</vm:outbound-endpoint>
</flow>
<flow name="ATspCisClient" doc:name="ATspCisClient">
<vm:inbound-endpoint path="vm.flow.2" doc:name="VM"/>
<choice doc:name="Choice">
<when expression="header:SESSION:DOBSearch=1">
<outbound-endpoint address="http://localhost:4724/ATspCisSearcher" doc:name="Generic">
<cxf:jaxws-client clientClass="com.ace.st.tf.web.ws.sq.atspcis.client.AtspcisSearcher_Service" port="AtspcisSearcherPort" wsdlLocation="classpath:ATspCisSearcher.wsdl" operation="atspSearchByNameDob"/>
</outbound-endpoint>
</when>
<otherwise>
<expression-component doc:name="set error message">payload="NOT A DOB SEARCH"</expression-component>
</otherwise>
</choice>
<vm:outbound-endpoint path="vm.aggregate" doc:name="VM">
<message-properties-transformer scope="outbound">
<add-message-property value="#[header:INBOUND:MULE_CORRELATION_ID]" key="MULE_CORRELATION_ID" />
<add-message-property value="#[header:INBOUND:MULE_CORRELATION_GROUP_SIZE]" key="MULE_CORRELATION_GROUP_SIZE" />
</message-properties-transformer>
</vm:outbound-endpoint>
</flow>
CustomAggregator
public class CustomAggregator extends SimpleCollectionAggregator
{
public String getStorePrefix() {
return super.getStorePrefix();
}
public void setStorePrefix(String storePrefix) {
super.setStorePrefix(storePrefix);
}
@Override
protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext) {
return new CollectionCorrelatorCallback(muleContext,false,storePrefix) {
@Override
public MuleEvent aggregateEvents(EventGroup events) throws AggregationException {
try {
for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();) {
MuleEvent event = iterator.next();
Object result = muleContext.getExpressionManager().evaluate("header:SESSION:TIMEOUT",event.getMessage());
setTimeout((java.lang.Integer)result);
}
}
catch (ObjectStoreException e) {
throw new AggregationException(events,null,e);
}
//return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent());
return super.aggregateEvents(events);
}
};
}
}