我想在哪里做一个流程:
- 等待两个文件: file_name.xdf 和 file_name.dff :如果两个文件(我想同时处理两个文件(等待两个文件),这些文件的文件名应该相同)
- 处理这些文件转换为字节数组
- 运行常规脚本
我怎样才能提出第一点?
我想在哪里做一个流程:
我怎样才能提出第一点?
您可以根据文件的“基本名称”聚合文件(即取不带扩展名的文件名),然后处理聚合集中的每个文件。
可以使用 Collection-Aggregator 或 Custom-Aggregator 执行聚合,这里每个示例都有一个示例:
使用集合聚合器:
<flow name="two-files-per-process-with-collection-aggregator">
<file:inbound-endpoint path="/file-in" moveToDirectory="/file-in-process" responseTimeout="10000" doc:name="Read files" >
<file:filename-regex-filter pattern=".*\.aaa|.*\.bbb" caseSensitive="true"/>
</file:inbound-endpoint>
<set-property propertyName="MULE_CORRELATION_ID" value="#[message.inboundProperties.originalFilename.substring(0,message.inboundProperties.originalFilename.lastIndexOf('.'))]" doc:name="Set Correlation-Id"/>
<set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="2" doc:name="Set Correlation-Group-Size"/>
<collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
<foreach doc:name="For Each">
<logger message="#[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/>
</foreach>
</flow>
使用自定义聚合器(您将需要自定义 java 类):
<flow name="two-files-per-process-with-custom-aggregator">
<file:inbound-endpoint path="/file-in" moveToDirectory="/file-in-process" responseTimeout="10000" doc:name="Read files">
<file:filename-regex-filter pattern=".*\.aaa|.*\.bbb" caseSensitive="true"/>
</file:inbound-endpoint>
<custom-aggregator failOnTimeout="true" class="org.mnc.MatchFileNames" doc:name="Custom Aggregator"/>
<foreach doc:name="For Each">
<logger message="#[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/>
</foreach>
</flow>
这是自定义聚合器的可能实现(它可能更优雅:
package org.mnc;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.routing.RoutingException;
import org.mule.routing.AbstractAggregator;
import org.mule.routing.EventGroup;
import org.mule.routing.correlation.EventCorrelatorCallback;
public class MatchFileNames extends AbstractAggregator {
@Override
protected EventCorrelatorCallback getCorrelatorCallback(final MuleContext muleContext) {
return new EventCorrelatorCallback() {
@Override
public boolean shouldAggregateEvents(EventGroup events) {
return events.size()==2;
}
@Override
public EventGroup createEventGroup(MuleEvent event, Object id) {
String filename = event.getMessage().getInboundProperty("originalFilename");
String commonFilename = filename.substring(0, filename.lastIndexOf('.'));
System.out.println(filename + " -> " + commonFilename);
return new EventGroup(commonFilename, muleContext, 2, true, storePrefix);
}
@Override
public MuleEvent aggregateEvents(EventGroup events) throws RoutingException {
return events.getMessageCollectionEvent();
}
};
}
}
使用石英组件 - 以所需的时间间隔触发您的流程。在此处阅读更多信息:http: //www.mulesoft.org/documentation/display/current/Quartz+Transport+Reference
触发时 - 编写比较两个目录并查找两者之间的文件对的 Java 代码。
在我的脑海中,我不确定是否有办法动态设置入站文件过滤器。否则 - 总是可以用 Java 处理整个过程;读取文件,转换为字节数组,最后将消息传播到 groovy 脚本。
您可以有两个文件入站端点,一个用于您等待的每个文件。当您的流程读取文件时,它会将文件复制到其他目录。如果另一个文件已被处理并移动到目录(您可以使用对象存储中的变量来跟踪它),则使用 name.ready 保存并将先前移动的文件移动到 name2.ready。
您有一个带有文件入站端点的第三个流,该端点使用 *.ready 通配符模式从该目录读取。然后使用Requester Module将另一个文件加载到变量中。