首先是我的要求的一点背景:
- 从多个动态定义的只读 ftp/sftp 站点下载大型 tar.gz 文件。
- 根据条目名称的扩展名处理 .tar.gz 中的文件。
- 使用骆驼 2.19.3
我的解决方案是使用 download=false 定义新路由,以仅获取未处理文件的列表。示例路线是:
from("ftp://user@localhost/path?download=false&inProgressRepository=#inProgressRepo&idempotentRepository=#idemRepo&noop=true&readLock=changed&readLockMarkerFile=false&autoCreate=false&stepwise=false").to("seda:download?size=3&concurrentConsumers=3&blockWhenFull=true&purgeWhenStopping=true")
将文件名发送到 seda 队列,该队列使用 streamDownload 下载文件并将 RemoteFile 发送到定义为的处理路由器:
from("seda:download?size=3&concurrentConsumers=3&blockWhenFull=true&purgeWhenStopping=true")
.process({
String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME_ONLY, String.class);
CamelContext context = exchange.getContext();
ConsumerTemplate downloadConsumer = context.createConsumerTemplate();
Producer unpackProducer = context.getRoute("unpack").getEndpoint().createProducer();
Map<String,Object> parms = new HashMap<>();
parms.put("fileName", fileName);
parms.put("runLoggingLevel", "INFO");
parms.put("consumer.bridgeErrorHandler", "true");
parms.put("idempotentRepository", "#idemRepo");
parms.put("noop", "true");
parms.put("readLock", "changed");
parms.put("readLockLoggingLevel", "INFO");
parms.put("readLockMarkerFile", "false");
parms.put("initialDelay", "0");
parms.put("autoCreate", "false");
parms.put("maximumReconnectAttempts", "0");
parms.put("streamDownload", "true");
parms.put("stepwise", "false");
parms.put("throwExceptionOnConnectFailed", "true");
parms.put("useList", "false");
downloadConsumer.start();
Exchange downloadExchange = downloadConsumer.receive(URISupport.normalizeUri(URISupport.appendParametersToURI("ftp://user@localhost/path", parms));
unpackProducer.process(downloadExchange);
if (downloadExchange.isFailed()) {
LOGGER.error("unpack failed", downloadExchange.getException());
exchange.setException(downloadExchange.getException());
}
downloadConsumer.doneUoW(downloadExchange);
downloadConsumer.stop();
}
解包路径定义为:
from("direct:unpack").routeId("unpack")
.convertBodyTo(InputStream.class, null)
.split(new TarSplitter()).streaming()
.choice()
.when(header(FILE_NAME).regex(XML_FILTER))
.unmarshal().jacksonxml(POJO.class)
.endChoice()
.when(header(FILE_NAME).regex(XML2_FILTER))
.unmarshal().jacksonxml(POJO2.class)
.endChoice()
.end()
.end()
.to("file://...")
首先,这是支持并发 ftp 消费者的好解决方案吗?我看到新的 FTPClient 实例是在同一个线程上创建和处理的。有更好的解决方案吗?
其次,使用 seda 队列处理流时会出现随机 tar 错误。如果使用直接而不是 seda 以便只处理单个文件,则不会发生错误。这似乎指向一个并发问题。我错过了一些明显的东西吗?
提前感谢您的帮助。