1

我已经从 mule 3.2.1 升级到 3.3.1,我面临以下问题。我正在使用自定义聚合器和自定义转换器来聚合多个文件进行处理。问题是,在我删除另一组文件时第一次聚合后,只调用了转换器类,没有其他组件被执行。控制台上也没有错误。当我重新启动骡子时,该过程只是第一次执行。

我的变压器类是:

public class FileCorrelationTransformer extends AbstractMessageTransformer {

      @Override
      public Object transformMessage(MuleMessage message, String arg1)
                  throws TransformerException {
            System.out.println("FileCorelationTransformer called");
            System.out.println("=============================");
            System.out.println("Extract Name = "
                        + message.getOutboundProperty("originalFilename"));

            final String fileName = message.getOutboundProperty("filename");

            final String extractName = Generate.extractName(fileName);
            final ArrayList<ExtractVO> extractVOs = ExtractDAO.getInstance()
                        .load(extractName);

            System.out.println("Group Size: " + extractVOs.get(0).getNoOfParts());
            System.out.println("=============================");
            message.setCorrelationId(extractName);
            message.setCorrelationGroupSize(extractVOs.get(0).getNoOfParts());
            return message;
      }
}

我的聚合器类是:

public class FileCollectionAggregator extends AbstractAggregator {

      @Override
      protected EventCorrelatorCallback getCorrelatorCallback(
                  MuleContext muleContext) {
            return new CollectionCorrelatorCallback(muleContext, false, "") {
                  @Override
                  public MuleEvent aggregateEvents(EventGroup events)
                              throws AggregationException {
                        CopyOnWriteArrayList<File> list = new CopyOnWriteArrayList<File>();
                        try {
                              for (Iterator<MuleEvent> iterator = events.iterator(); iterator
                                          .hasNext();) {
                                    MuleEvent event = iterator.next();
                                    try {
                                          list.add(new File(event.transformMessageToString()));
                                    } catch (TransformerException e) {
                                          throw new AggregationException(events, null, e);
                                    }
                              }
                        } catch (ObjectStoreException e) {
                              throw new AggregationException(events, null, e);
                        }
                        return new DefaultMuleEvent(new DefaultMuleMessage(list,
                                    muleContext), events.getMessageCollectionEvent());
                  }
            };
      }
}

我的骡流程是:

<vm:connector name="ConnectorSinglePartMonthly"
            dynamicNotification="true" doc:name="VM" />

      <custom-transformer
            class="com.transformers.FileCorrelationTransformer" name="fileMultiPartCorrelationTransformer"
            doc:name="Java" />

      <flow name="CountTP" doc:name="CountTP">
            <file:inbound-endpoint path="c:/test/input"
                  connector-ref="fileConnector" doc:name="Count CSV">
                  <file:filename-regex-filter pattern="UGJERT[0-9]*.csv"
                        caseSensitive="false" />
            </file:inbound-endpoint>
            <vm:outbound-endpoint connector-ref="ConnectorMultiPartMonthly"
                  path="monthlyMultiPartFiles" transformer-refs="fileMultiPartCorrelationTransformer"
                  doc:name="TP Multi VM Out" />
            <logger message="Done with the Flow!" doc:name="Logger" />
      </flow>

      <flow name="MultiPartMonthlyFlow" doc:name="MultiPartMonthlyFlow">
            <vm:inbound-endpoint connector-ref="ConnectorMultiPartMonthly"
                  path="monthlyMultiPartFiles" doc:name="TP Multi Monthly VM In" />

            <custom-aggregator class="com.routing.FileCollectionAggregator"
                  doc:name="Custom Aggregator" />

              <component doc:name="Csv Reader">
                   <prototype-object
                        class="com.utility.CsvReader" />
                </component>

      </flow>

我的控制台输出是:

首次执行后

INFO  2013-07-19 16:51:02,296 [[test].fileConnector.receiver.05] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT12.csv
INFO  2013-07-19 16:51:02,311 [[test].fileConnector.receiver.05] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT22.csv
FileCorelationTransformer called
FileCorelationTransformer called
=============================
=============================
Extract Name = UGJERT12.csv
Extract Name = UGJERT22.csv
Group Size: 2
=============================
Group Size: 2
=============================
INFO  2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'ConnectorMultiPartMonthly.dispatcher.6508195'. Object is: VMMessageDispatcher
INFO  2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.02] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'ConnectorMultiPartMonthly.dispatcher.29350820'. Object is: VMMessageDispatcher
INFO  2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'ConnectorMultiPartMonthly.dispatcher.6508195'. Object is: VMMessageDispatcher
INFO  2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.02] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'ConnectorMultiPartMonthly.dispatcher.29350820'. Object is: VMMessageDispatcher

Process execution starts and extractName is = UGJERT

第二次执行后

INFO  2013-07-19 16:53:18,530 [[test].fileConnector.receiver.09] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT12.csv
INFO  2013-07-19 16:53:18,546 [[test].fileConnector.receiver.09] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT22.csv
FileCorelationTransformer called
=============================
Extract Name = UGJERT12.csv
Group Size: 2
=============================
FileCorelationTransformer called
=============================
Extract Name = UGJERT22.csv
Group Size: 2
=============================

对于每组文件,相关 id 是文件的名称,在不同的文件组中是唯一的。谁能告诉我代码有什么问题。

4

1 回答 1

1

您不能重用关联 ID,因为它保存在对象存储中(这解释了为什么重新启动 Mule 后它会再次工作)。
您必须为每个聚合组使用不同的关联 ID。

于 2013-07-25T12:53:16.627 回答