2

我有以下 SFTP 文件同步器:

@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
    fileSynchronizer.setDeleteRemoteFiles(false);
    fileSynchronizer.setRemoteDirectory(applicationProperties.getSftpDirectory());
    CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<ChannelSftp.LsEntry>();
    compositeFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(store, "sftp"));
    compositeFileListFilter.addFilter(new SftpSimplePatternFileListFilter(applicationProperties.getLoadFileNamePattern()));
    fileSynchronizer.setFilter(compositeFileListFilter);
    fileSynchronizer.setPreserveTimestamp(true);
    return fileSynchronizer;
}

当应用程序首次运行时,它会与远程 SFTP 站点目录同步到本地目录。但是,它无法获取远程 SFTP 目录文件中的任何后续更改。

计划轮询如下:

@Bean
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata"))
public SftpInboundFileSynchronizingMessageSource sftpMessageSource() {
    SftpInboundFileSynchronizingMessageSource source =
            new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
    source.setLocalDirectory(applicationProperties.getScheduledLoadDirectory());
    source.setAutoCreateLocalDirectory(true);
    ChainFileListFilter<File> chainFileFilter = new ChainFileListFilter<File>();
    chainFileFilter.addFilter(new LastModifiedFileListFilter());
    FileSystemPersistentAcceptOnceFileListFilter fs = new FileSystemPersistentAcceptOnceFileListFilter(store, "dailyfilesystem");
    fs.setFlushOnUpdate(true);
    chainFileFilter.addFilter(fs);
    source.setLocalFilter(chainFileFilter);
    source.setCountsEnabled(true);
    return source;
}

@Bean
public PollerMetadata pollerMetadata(RetryCompoundTriggerAdvice retryCompoundTriggerAdvice) {
    PollerMetadata pollerMetadata = new PollerMetadata();
    List<Advice> adviceChain = new ArrayList<Advice>();
    adviceChain.add(retryCompoundTriggerAdvice);
    pollerMetadata.setAdviceChain(adviceChain);
    pollerMetadata.setTrigger(compoundTrigger());
    pollerMetadata.setMaxMessagesPerPoll(1);
    return pollerMetadata;
}

@Bean
public CompoundTrigger compoundTrigger() {
    CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger());
    return compoundTrigger;
}

@Bean
public CronTrigger primaryTrigger() {
    return new CronTrigger(applicationProperties.getSchedule());
}

@Bean
public PeriodicTrigger secondaryTrigger() {
    return new PeriodicTrigger(applicationProperties.getRetryInterval());
}

在which extends的afterReceive方法中,第一次运行后我得到一个空结果。RetryCompoundTriggerAdviceAbstractMessageSourceAdvice

如何配置同步器以使其定期同步(而不是在应用程序启动时仅同步一次)?

更新

我发现当 SFTP 站点在我的应用程序启动时其目录中没有文件时,会SftpInboundFileSynchronizer在每个轮询间隔进行同步。com.jcraft.jsch所以我可以在每次投票时看到日志语句。但是,一旦在 SFTP 站点上找到文件,它就会同步以在本地获取该文件,然后不再同步。

更新 2

抱歉……这是自定义代码:

@Component
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice {

    private final static Logger logger = LoggerFactory.getLogger(RetryCompoundTriggerAdvice.class);

    private final CompoundTrigger compoundTrigger;

    private final Trigger override;

    private final ApplicationProperties applicationProperties;

    private final Mail mail;

    private int attempts = 0;

    private boolean expectedMessage;
    private boolean inProcess;

    public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger, 
            @Qualifier("secondaryTrigger") Trigger override, 
            ApplicationProperties applicationProperties,
            Mail mail) {
        this.compoundTrigger = compoundTrigger;
        this.override = override;
        this.applicationProperties = applicationProperties;
        this.mail = mail;
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        logger.debug("!inProcess is " + !inProcess);
        return !inProcess;
    }

    @Override
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {

        if (expectedMessage) {
            logger.info("Received expected load file. Setting cron trigger.");
            this.compoundTrigger.setOverride(null);
            expectedMessage = false;
            return result;
        }

        final int  maxOverrideAttempts = applicationProperties.getMaxFileRetry();

        attempts++;
        if (result == null && attempts < maxOverrideAttempts) {
            logger.info("Unable to find file after " + attempts + " attempt(s). Will reattempt");
            this.compoundTrigger.setOverride(this.override);
        } else if (result == null && attempts >= maxOverrideAttempts) {
            String message = "Unable to find daily file" + 
                    " after " + attempts + 
                    " attempt(s). Will not reattempt since max number of attempts is set at " + 
                    maxOverrideAttempts + "."; 
            logger.warn(message);
            mail.sendAdminsEmail("Missing Load File", message);
            attempts = 0;
            this.compoundTrigger.setOverride(null);
        } else {
            attempts = 0;
            // keep periodically checking until we are certain
            // that this message is the expected message
            this.compoundTrigger.setOverride(this.override);
            inProcess = true;
            logger.info("Found load file");
        }
        return result;
    }

    public void foundExpectedMessage(boolean found) {
        logger.debug("Expected message was found? " + found);
        this.expectedMessage = found;
        inProcess = false;
    }

}
4

1 回答 1

1

你有这样的逻辑:

@Override
public boolean beforeReceive(MessageSource<?> source) {
    logger.debug("!inProcess is " + !inProcess);
    return !inProcess;
}

让我们研究一下它的JavaDoc:

/**
 * Subclasses can decide whether to proceed with this poll.
 * @param source the message source.
 * @return true to proceed.
 */
public abstract boolean beforeReceive(MessageSource<?> source);

以及围绕此方法的逻辑:

    Message<?> result = null;
    if (beforeReceive((MessageSource<?>) target)) {
        result = (Message<?>) invocation.proceed();
    }
    return afterReceive(result, (MessageSource<?>) target);

因此,我们仅在返回时才调用invocation.proceed()(SFTP 同步) 。在您的情况下,仅当.beforeReceive()true!inProcess

在你的afterReceive() 实现inProcess = true;中,如果你有result- 在第一次尝试。并且看起来您将其重置为false仅当有人调用它时foundExpectedMessage()

那么,您对我们的问题有何期望?它确实在您的自定义代码中,与框架无关。对不起...

于 2017-07-24T20:08:58.780 回答