2

我编写了一段代码,它使用 Spring SFTP Outbound 网关并执行 GET 操作。整个配置在 JAVA 中(没有 XML)。我制作了一个缓存会话工厂,最多允许 10 个会话。由于多次 GET 请求超过 10 次后,GET 请求开始失败。

我阅读了文档,它被写入操作后关闭会话,但我无法弄清楚如何在 JAVA 配置中关闭此会话?

@org.springframework.integration.annotation.MessagingGateway
public interface FileOperationGateway {
    @Gateway(requestChannel = "sftpChannelDownload")
    InputStream downloadFromSftp(Message<Boolean> message);

}



@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(SFTP_HOST);
    factory.setPort(SFTP_PORT);
    factory.setUser(SFTP_USERNAME);
    factory.setPassword(SFTP_PASSWORD);
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<LsEntry>(factory);
}

/**
 * Bean for Caching the session 
 * 
 */

@Bean
@Autowired
public CachingSessionFactory<LsEntry> cachingSessionFactory(SessionFactory<LsEntry> sftpSessionFactory) {
    CachingSessionFactory<LsEntry> cachingSessionFactory = new CachingSessionFactory<>(sftpSessionFactory, 10);
    cachingSessionFactory.setSessionWaitTimeout(SFTP_SESSION_TIMEOUT);
    return cachingSessionFactory;
}

/**
 * Bean for Remote File Template 
 * 
 * @return
 * @throws Exception 
 */

@Bean
@Autowired
public RemoteFileTemplate<LsEntry> remoteFileTemplateDesigner(CachingSessionFactory<LsEntry> csf) throws Exception {
    ExpressionParser expressionParser = new SpelExpressionParser();
    Expression expression = expressionParser.parseExpression("'" + SFTP_LOCATION + "'");
    SftpRemoteFileTemplate rft = new SftpRemoteFileTemplate(csf);
    rft.setRemoteDirectoryExpression(expression);
    rft.setRemoteFileSeparator("/");
    rft.setFileNameGenerator((msg) -> {
        Timestamp timestamp = new Timestamp(System.currentTimeMillis());
        Instant instant = timestamp.toInstant();
        String fileNameFromHeader = msg.getHeaders().get(FileOperationConstants.FILE_HEADER_KEY).toString();
        String newFileName;
        if (fileNameFromHeader.lastIndexOf("/") != -1) {
            newFileName = fileNameFromHeader.substring(fileNameFromHeader.lastIndexOf("/"));
        } else if (fileNameFromHeader.lastIndexOf("\\") != -1) {
            newFileName = fileNameFromHeader.substring(fileNameFromHeader.lastIndexOf("\\"));
        } else
            newFileName = fileNameFromHeader;

        String fileNameOnly = newFileName.substring(0, newFileName.lastIndexOf("."));
        String fileType = newFileName.substring(newFileName.lastIndexOf(".") + 1);
        return (fileNameOnly + "__" + instant.toString() + "." + fileType);
    });
    rft.afterPropertiesSet();
    return rft;
}

@Bean
@Autowired
@ServiceActivator(inputChannel = "sftpChannelDownload")
public SftpOutboundGatewaySpec downloadHandler(RemoteFileTemplate<LsEntry> rft) {
    SftpOutboundGatewaySpec sogs =  Sftp.outboundGateway(rft, FileOperationConstants.FILE_DOWNLOAD_COMMAND,
            FileOperationConstants.FILE_DOWNLOAD_EXPRESSION);
    sogs.options(Option.STREAM);
    return sogs;
}

******更新:******

我使用@messageEndpoint 创建了一个新类,并将可关闭的会话代码放入其中。然后我从我的服务类(我正在消费流的地方)调用这个处理程序这工作:

    @MessageEndpoint
public class FileOperationCloseSessionMessageHandler {

    @ServiceActivator(inputChannel = "sftpCloseSession")
    public void closeSession(Message<Boolean> msg) throws IOException {

        Closeable closeable = new IntegrationMessageHeaderAccessor(msg).getCloseableResource();
        if (closeable != null) {
            closeable.close();
        }
    }
}

将此行放在@MessagingGateway 带注释的类中

@Gateway(requestChannel = "sftpCloseSession")
void closeSession(Message<InputStream> msg);

然后从服务类调用网关方法:

Message<InputStream> msg = msgGateway.downloadFromSftp(message);
    InputStream is = msg.getPayload();
    msgGateway.closeSession(msg);
4

1 回答 1

0

sogs.options(Option.STREAM);

流式传输文件时,您有责任在完成流式传输后关闭会话。这在文档中进行了解释。

将远程文件作为流消费时,用户负责在消费流后关闭 Session。为方便起见,在 IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE 标头中提供了 Session,在 IntegrationMessageHeaderAccessor 上提供了一个方便的方法:

Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
    closeable.close();
}

File Splitter 和 Stream Transformer 等框架组件将在数据传输后自动关闭会话。

于 2017-03-27T15:52:50.750 回答