我已经从 mule 3.2.1 升级到 3.3.1,我面临以下问题。我正在使用自定义聚合器和自定义转换器来聚合多个文件进行处理。问题是,在我删除另一组文件时第一次聚合后,只调用了转换器类,没有其他组件被执行。控制台上也没有错误。当我重新启动骡子时,该过程只是第一次执行。
我的变压器类是:
public class FileCorrelationTransformer extends AbstractMessageTransformer {
@Override
public Object transformMessage(MuleMessage message, String arg1)
throws TransformerException {
System.out.println("FileCorelationTransformer called");
System.out.println("=============================");
System.out.println("Extract Name = "
+ message.getOutboundProperty("originalFilename"));
final String fileName = message.getOutboundProperty("filename");
final String extractName = Generate.extractName(fileName);
final ArrayList<ExtractVO> extractVOs = ExtractDAO.getInstance()
.load(extractName);
System.out.println("Group Size: " + extractVOs.get(0).getNoOfParts());
System.out.println("=============================");
message.setCorrelationId(extractName);
message.setCorrelationGroupSize(extractVOs.get(0).getNoOfParts());
return message;
}
}
我的聚合器类是:
public class FileCollectionAggregator extends AbstractAggregator {
@Override
protected EventCorrelatorCallback getCorrelatorCallback(
MuleContext muleContext) {
return new CollectionCorrelatorCallback(muleContext, false, "") {
@Override
public MuleEvent aggregateEvents(EventGroup events)
throws AggregationException {
CopyOnWriteArrayList<File> list = new CopyOnWriteArrayList<File>();
try {
for (Iterator<MuleEvent> iterator = events.iterator(); iterator
.hasNext();) {
MuleEvent event = iterator.next();
try {
list.add(new File(event.transformMessageToString()));
} catch (TransformerException e) {
throw new AggregationException(events, null, e);
}
}
} catch (ObjectStoreException e) {
throw new AggregationException(events, null, e);
}
return new DefaultMuleEvent(new DefaultMuleMessage(list,
muleContext), events.getMessageCollectionEvent());
}
};
}
}
我的骡流程是:
<vm:connector name="ConnectorSinglePartMonthly"
dynamicNotification="true" doc:name="VM" />
<custom-transformer
class="com.transformers.FileCorrelationTransformer" name="fileMultiPartCorrelationTransformer"
doc:name="Java" />
<flow name="CountTP" doc:name="CountTP">
<file:inbound-endpoint path="c:/test/input"
connector-ref="fileConnector" doc:name="Count CSV">
<file:filename-regex-filter pattern="UGJERT[0-9]*.csv"
caseSensitive="false" />
</file:inbound-endpoint>
<vm:outbound-endpoint connector-ref="ConnectorMultiPartMonthly"
path="monthlyMultiPartFiles" transformer-refs="fileMultiPartCorrelationTransformer"
doc:name="TP Multi VM Out" />
<logger message="Done with the Flow!" doc:name="Logger" />
</flow>
<flow name="MultiPartMonthlyFlow" doc:name="MultiPartMonthlyFlow">
<vm:inbound-endpoint connector-ref="ConnectorMultiPartMonthly"
path="monthlyMultiPartFiles" doc:name="TP Multi Monthly VM In" />
<custom-aggregator class="com.routing.FileCollectionAggregator"
doc:name="Custom Aggregator" />
<component doc:name="Csv Reader">
<prototype-object
class="com.utility.CsvReader" />
</component>
</flow>
我的控制台输出是:
首次执行后
INFO 2013-07-19 16:51:02,296 [[test].fileConnector.receiver.05] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT12.csv
INFO 2013-07-19 16:51:02,311 [[test].fileConnector.receiver.05] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT22.csv
FileCorelationTransformer called
FileCorelationTransformer called
=============================
=============================
Extract Name = UGJERT12.csv
Extract Name = UGJERT22.csv
Group Size: 2
=============================
Group Size: 2
=============================
INFO 2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'ConnectorMultiPartMonthly.dispatcher.6508195'. Object is: VMMessageDispatcher
INFO 2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.02] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'ConnectorMultiPartMonthly.dispatcher.29350820'. Object is: VMMessageDispatcher
INFO 2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'ConnectorMultiPartMonthly.dispatcher.6508195'. Object is: VMMessageDispatcher
INFO 2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.02] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'ConnectorMultiPartMonthly.dispatcher.29350820'. Object is: VMMessageDispatcher
Process execution starts and extractName is = UGJERT
第二次执行后
INFO 2013-07-19 16:53:18,530 [[test].fileConnector.receiver.09] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT12.csv
INFO 2013-07-19 16:53:18,546 [[test].fileConnector.receiver.09] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT22.csv
FileCorelationTransformer called
=============================
Extract Name = UGJERT12.csv
Group Size: 2
=============================
FileCorelationTransformer called
=============================
Extract Name = UGJERT22.csv
Group Size: 2
=============================
对于每组文件,相关 id 是文件的名称,在不同的文件组中是唯一的。谁能告诉我代码有什么问题。