我有以下 SFTP 文件同步器:
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(applicationProperties.getSftpDirectory());
CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<ChannelSftp.LsEntry>();
compositeFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(store, "sftp"));
compositeFileListFilter.addFilter(new SftpSimplePatternFileListFilter(applicationProperties.getLoadFileNamePattern()));
fileSynchronizer.setFilter(compositeFileListFilter);
fileSynchronizer.setPreserveTimestamp(true);
return fileSynchronizer;
}
当应用程序首次运行时,它会与远程 SFTP 站点目录同步到本地目录。但是,它无法获取远程 SFTP 目录文件中的任何后续更改。
计划轮询如下:
@Bean
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata"))
public SftpInboundFileSynchronizingMessageSource sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(applicationProperties.getScheduledLoadDirectory());
source.setAutoCreateLocalDirectory(true);
ChainFileListFilter<File> chainFileFilter = new ChainFileListFilter<File>();
chainFileFilter.addFilter(new LastModifiedFileListFilter());
FileSystemPersistentAcceptOnceFileListFilter fs = new FileSystemPersistentAcceptOnceFileListFilter(store, "dailyfilesystem");
fs.setFlushOnUpdate(true);
chainFileFilter.addFilter(fs);
source.setLocalFilter(chainFileFilter);
source.setCountsEnabled(true);
return source;
}
@Bean
public PollerMetadata pollerMetadata(RetryCompoundTriggerAdvice retryCompoundTriggerAdvice) {
PollerMetadata pollerMetadata = new PollerMetadata();
List<Advice> adviceChain = new ArrayList<Advice>();
adviceChain.add(retryCompoundTriggerAdvice);
pollerMetadata.setAdviceChain(adviceChain);
pollerMetadata.setTrigger(compoundTrigger());
pollerMetadata.setMaxMessagesPerPoll(1);
return pollerMetadata;
}
@Bean
public CompoundTrigger compoundTrigger() {
CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger());
return compoundTrigger;
}
@Bean
public CronTrigger primaryTrigger() {
return new CronTrigger(applicationProperties.getSchedule());
}
@Bean
public PeriodicTrigger secondaryTrigger() {
return new PeriodicTrigger(applicationProperties.getRetryInterval());
}
在which extends的afterReceive
方法中,第一次运行后我得到一个空结果。RetryCompoundTriggerAdvice
AbstractMessageSourceAdvice
如何配置同步器以使其定期同步(而不是在应用程序启动时仅同步一次)?
更新
我发现当 SFTP 站点在我的应用程序启动时其目录中没有文件时,会SftpInboundFileSynchronizer
在每个轮询间隔进行同步。com.jcraft.jsch
所以我可以在每次投票时看到日志语句。但是,一旦在 SFTP 站点上找到文件,它就会同步以在本地获取该文件,然后不再同步。
更新 2
抱歉……这是自定义代码:
@Component
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice {
private final static Logger logger = LoggerFactory.getLogger(RetryCompoundTriggerAdvice.class);
private final CompoundTrigger compoundTrigger;
private final Trigger override;
private final ApplicationProperties applicationProperties;
private final Mail mail;
private int attempts = 0;
private boolean expectedMessage;
private boolean inProcess;
public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger,
@Qualifier("secondaryTrigger") Trigger override,
ApplicationProperties applicationProperties,
Mail mail) {
this.compoundTrigger = compoundTrigger;
this.override = override;
this.applicationProperties = applicationProperties;
this.mail = mail;
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
logger.debug("!inProcess is " + !inProcess);
return !inProcess;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
if (expectedMessage) {
logger.info("Received expected load file. Setting cron trigger.");
this.compoundTrigger.setOverride(null);
expectedMessage = false;
return result;
}
final int maxOverrideAttempts = applicationProperties.getMaxFileRetry();
attempts++;
if (result == null && attempts < maxOverrideAttempts) {
logger.info("Unable to find file after " + attempts + " attempt(s). Will reattempt");
this.compoundTrigger.setOverride(this.override);
} else if (result == null && attempts >= maxOverrideAttempts) {
String message = "Unable to find daily file" +
" after " + attempts +
" attempt(s). Will not reattempt since max number of attempts is set at " +
maxOverrideAttempts + ".";
logger.warn(message);
mail.sendAdminsEmail("Missing Load File", message);
attempts = 0;
this.compoundTrigger.setOverride(null);
} else {
attempts = 0;
// keep periodically checking until we are certain
// that this message is the expected message
this.compoundTrigger.setOverride(this.override);
inProcess = true;
logger.info("Found load file");
}
return result;
}
public void foundExpectedMessage(boolean found) {
logger.debug("Expected message was found? " + found);
this.expectedMessage = found;
inProcess = false;
}
}