4

我有一个 SFTP 目录并读取文件并将文件发送到 ServiceActivator 以进行进一步处理。在任何时候我都需要使用处理程序并行处理它们。

这是我的 Spring Integration java DSL 流程。

IntegrationFlows.from(Sftp.inboundAdapter(getSftpSessionFactory())
                        .temporaryFileSuffix("COPY")
                        .localDirectory(directory)
                        .deleteRemoteFiles(false)
                        .preserveTimestamp(true)
                        .remoteDirectory("remoteDir"))
                        .patternFilter("*.txt")), e -> e.poller(Pollers.fixedDelay(500).maxMessagesPerPoll(5)))
                        .handle("mybean", "myMethod")
                        .handle(Files.outboundAdapter(new File("success")))         
                        .deleteSourceFiles(true)
                        .autoCreateDirectory(true))
                        .get();

更新:这是我的 ThreadPoolExecutor:

@Bean(name = "executor")
public Executor getExecutor()
{
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.setQueueCapacity(20);          
    executor.initialize();
    return executor;
}
4

1 回答 1

1

( Sftp.inboundAdapter())SftpInboundFileSynchronizingMessageSource无论如何都会一一返回远程文件。首先,它将它们同步到本地目录,然后才轮询它们以将消息处理为File有效负载。

要并行处理它们,只需将 a 添加taskExecutor到您的e.poller()定义中,所有这些maxMessagesPerPoll(5)都将分发到不同的线程。

于 2016-03-11T23:28:38.047 回答