可以更新AbstractInboundFileSynchronizer
以识别更新的文件,但它很脆弱并且您会遇到其他问题。
2016 年 11 月 13 日更新:了解如何以秒为单位获取修改时间戳。
更新的主要问题AbstractInboundFileSynchronizer
是它有 setter-methods 但没有(受保护的)getter-methods。如果将来 setter-methods 做一些聪明的事情,这里提供的更新版本将会中断。
更新本地目录中的文件的主要问题是并发性:如果您在接收更新的同时处理本地文件,您可能会遇到各种麻烦。简单的方法是将本地文件移动到(临时)处理目录,以便可以将更新作为新文件接收,从而无需更新AbstractInboundFileSynchronizer
。另请参阅 Camel 的时间戳注释。
默认情况下,FTP 服务器以分钟为单位提供修改时间戳。为了测试,我更新了 FTP 客户端以使用MLSD命令,该命令以秒为单位提供修改时间戳(如果幸运的话,还有毫秒),但并非所有 FTP 服务器都支持这一点。
正如Spring FTP 参考中提到的,本地文件过滤器需要是一个FileSystemPersistentAcceptOnceFileListFilter
以确保在修改时间戳更改时提取本地文件。
下面是我更新的版本AbstractInboundFileSynchronizer
,后面是我使用的一些测试类。
public class FtpUpdatingFileSynchronizer extends FtpInboundFileSynchronizer {
protected final Log logger = LogFactory.getLog(this.getClass());
private volatile Expression localFilenameGeneratorExpression;
private volatile EvaluationContext evaluationContext;
private volatile boolean deleteRemoteFiles;
private volatile String remoteFileSeparator = "/";
private volatile boolean preserveTimestamp;
public FtpUpdatingFileSynchronizer(SessionFactory<FTPFile> sessionFactory) {
super(sessionFactory);
setPreserveTimestamp(true);
}
@Override
public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) {
super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression);
this.localFilenameGeneratorExpression = localFilenameGeneratorExpression;
}
@Override
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
super.setIntegrationEvaluationContext(evaluationContext);
this.evaluationContext = evaluationContext;
}
@Override
public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
super.setDeleteRemoteFiles(deleteRemoteFiles);
this.deleteRemoteFiles = deleteRemoteFiles;
}
@Override
public void setRemoteFileSeparator(String remoteFileSeparator) {
super.setRemoteFileSeparator(remoteFileSeparator);
this.remoteFileSeparator = remoteFileSeparator;
}
@Override
public void setPreserveTimestamp(boolean preserveTimestamp) {
// updated
Assert.isTrue(preserveTimestamp, "for updating timestamps must be preserved");
super.setPreserveTimestamp(preserveTimestamp);
this.preserveTimestamp = preserveTimestamp;
}
@Override
protected void copyFileToLocalDirectory(String remoteDirectoryPath, FTPFile remoteFile, File localDirectory,
Session<FTPFile> session) throws IOException {
String remoteFileName = this.getFilename(remoteFile);
String localFileName = this.generateLocalFileName(remoteFileName);
String remoteFilePath = (remoteDirectoryPath != null
? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
: remoteFileName);
if (!this.isFile(remoteFile)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("cannot copy, not a file: " + remoteFilePath);
}
return;
}
// start update
File localFile = new File(localDirectory, localFileName);
boolean update = false;
if (localFile.exists()) {
if (this.getModified(remoteFile) > localFile.lastModified()) {
this.logger.info("Updating local file " + localFile);
update = true;
} else {
this.logger.info("File already exists: " + localFile);
return;
}
}
// end update
String tempFileName = localFile.getAbsolutePath() + this.getTemporaryFileSuffix();
File tempFile = new File(tempFileName);
OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
try {
session.read(remoteFilePath, outputStream);
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
else {
throw new MessagingException("Failure occurred while copying from remote to local directory", e);
}
} finally {
try {
outputStream.close();
}
catch (Exception ignored2) {
}
}
// updated
if (update && !localFile.delete()) {
throw new MessagingException("Unable to delete local file [" + localFile + "] for update.");
}
if (tempFile.renameTo(localFile)) {
if (this.deleteRemoteFiles) {
session.remove(remoteFilePath);
if (this.logger.isDebugEnabled()) {
this.logger.debug("deleted " + remoteFilePath);
}
}
// updated
this.logger.info("Stored file locally: " + localFile);
} else {
// updated
throw new MessagingException("Unable to rename temporary file [" + tempFile + "] to [" + localFile + "]");
}
if (this.preserveTimestamp) {
localFile.setLastModified(getModified(remoteFile));
}
}
private String generateLocalFileName(String remoteFileName) {
if (this.localFilenameGeneratorExpression != null) {
return this.localFilenameGeneratorExpression.getValue(this.evaluationContext, remoteFileName, String.class);
}
return remoteFileName;
}
}
遵循我使用的一些测试类。我使用了依赖项org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
和org.apache.ftpserver:ftpserver-core:1.0.6
(加上通常的日志记录和测试依赖项)。
public class TestFtpSync {
static final Logger log = LoggerFactory.getLogger(TestFtpSync.class);
static final String FTP_ROOT_DIR = "target" + File.separator + "ftproot";
// org.apache.ftpserver:ftpserver-core:1.0.6
static FtpServer server;
@BeforeClass
public static void startServer() throws FtpException {
File ftpRoot = new File (FTP_ROOT_DIR);
ftpRoot.mkdirs();
TestUserManager userManager = new TestUserManager(ftpRoot.getAbsolutePath());
FtpServerFactory serverFactory = new FtpServerFactory();
serverFactory.setUserManager(userManager);
ListenerFactory factory = new ListenerFactory();
factory.setPort(4444);
serverFactory.addListener("default", factory.createListener());
server = serverFactory.createServer();
server.start();
}
@AfterClass
public static void stopServer() {
if (server != null) {
server.stop();
}
}
File ftpFile = Paths.get(FTP_ROOT_DIR, "test1.txt").toFile();
File ftpFile2 = Paths.get(FTP_ROOT_DIR, "test2.txt").toFile();
@Test
public void syncDir() {
// org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
try {
ctx.register(FtpSyncConf.class);
ctx.refresh();
PollableChannel msgChannel = ctx.getBean("inputChannel", PollableChannel.class);
for (int j = 0; j < 2; j++) {
for (int i = 0; i < 2; i++) {
storeFtpFile();
}
for (int i = 0; i < 4; i++) {
fetchMessage(msgChannel);
}
}
} catch (Exception e) {
throw new AssertionError("FTP test failed.", e);
} finally {
ctx.close();
cleanup();
}
}
boolean tswitch = true;
void storeFtpFile() throws IOException, InterruptedException {
File f = (tswitch ? ftpFile : ftpFile2);
tswitch = !tswitch;
log.info("Writing message " + f.getName());
Files.write(f.toPath(), ("Hello " + System.currentTimeMillis()).getBytes());
}
Message<?> fetchMessage(PollableChannel msgChannel) {
log.info("Fetching message.");
Message<?> msg = msgChannel.receive(1000L);
if (msg == null) {
log.info("No message.");
} else {
log.info("Have a message: " + msg);
}
return msg;
}
void cleanup() {
delFile(ftpFile);
delFile(ftpFile2);
File d = new File(FtpSyncConf.LOCAL_DIR);
if (d.isDirectory()) {
for (File f : d.listFiles()) {
delFile(f);
}
}
log.info("Finished cleanup");
}
void delFile(File f) {
if (f.isFile()) {
if (f.delete()) {
log.info("Deleted " + f);
} else {
log.error("Cannot delete file " + f);
}
}
}
}
public class MlistFtpSessionFactory extends AbstractFtpSessionFactory<MlistFtpClient> {
@Override
protected MlistFtpClient createClientInstance() {
return new MlistFtpClient();
}
}
public class MlistFtpClient extends FTPClient {
@Override
public FTPFile[] listFiles(String pathname) throws IOException {
return super.mlistDir(pathname);
}
}
@EnableIntegration
@Configuration
public class FtpSyncConf {
private static final Logger log = LoggerFactory.getLogger(FtpSyncConf.class);
public static final String LOCAL_DIR = "/tmp/received";
@Bean(name = "ftpMetaData")
public ConcurrentMetadataStore ftpMetaData() {
return new SimpleMetadataStore();
}
@Bean(name = "localMetaData")
public ConcurrentMetadataStore localMetaData() {
return new SimpleMetadataStore();
}
@Bean(name = "ftpFileSyncer")
public FtpUpdatingFileSynchronizer ftpFileSyncer(
@Qualifier("ftpMetaData") ConcurrentMetadataStore metadataStore) {
MlistFtpSessionFactory ftpSessionFactory = new MlistFtpSessionFactory();
ftpSessionFactory.setHost("localhost");
ftpSessionFactory.setPort(4444);
ftpSessionFactory.setUsername("demo");
ftpSessionFactory.setPassword("demo");
FtpPersistentAcceptOnceFileListFilter fileFilter = new FtpPersistentAcceptOnceFileListFilter(metadataStore, "ftp");
fileFilter.setFlushOnUpdate(true);
FtpUpdatingFileSynchronizer ftpFileSync = new FtpUpdatingFileSynchronizer(ftpSessionFactory);
ftpFileSync.setFilter(fileFilter);
// ftpFileSync.setDeleteRemoteFiles(true);
return ftpFileSync;
}
@Bean(name = "syncFtp")
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "500", maxMessagesPerPoll = "1"))
public MessageSource<File> syncChannel(
@Qualifier("localMetaData") ConcurrentMetadataStore metadataStore,
@Qualifier("ftpFileSyncer") FtpUpdatingFileSynchronizer ftpFileSync) throws Exception {
FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(ftpFileSync);
File receiveDir = new File(LOCAL_DIR);
receiveDir.mkdirs();
messageSource.setLocalDirectory(receiveDir);
messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore, "local"));
log.info("Message source bean created.");
return messageSource;
}
@Bean(name = "inputChannel")
public PollableChannel inputChannel() {
QueueChannel channel = new QueueChannel();
log.info("Message channel bean created.");
return channel;
}
}
/**
* Copied from https://github.com/spring-projects/spring-integration-samples/tree/master/basic/ftp/src/test/java/org/springframework/integration/samples/ftp/support
* @author Gunnar Hillert
*
*/
public class TestUserManager extends AbstractUserManager {
private BaseUser testUser;
private BaseUser anonUser;
private static final String TEST_USERNAME = "demo";
private static final String TEST_PASSWORD = "demo";
public TestUserManager(String homeDirectory) {
super("admin", new ClearTextPasswordEncryptor());
testUser = new BaseUser();
testUser.setAuthorities(Arrays.asList(new Authority[] {new ConcurrentLoginPermission(1, 1), new WritePermission()}));
testUser.setEnabled(true);
testUser.setHomeDirectory(homeDirectory);
testUser.setMaxIdleTime(10000);
testUser.setName(TEST_USERNAME);
testUser.setPassword(TEST_PASSWORD);
anonUser = new BaseUser(testUser);
anonUser.setName("anonymous");
}
public User getUserByName(String username) throws FtpException {
if(TEST_USERNAME.equals(username)) {
return testUser;
} else if(anonUser.getName().equals(username)) {
return anonUser;
}
return null;
}
public String[] getAllUserNames() throws FtpException {
return new String[] {TEST_USERNAME, anonUser.getName()};
}
public void delete(String username) throws FtpException {
throw new UnsupportedOperationException("Deleting of FTP Users is not supported.");
}
public void save(User user) throws FtpException {
throw new UnsupportedOperationException("Saving of FTP Users is not supported.");
}
public boolean doesExist(String username) throws FtpException {
return (TEST_USERNAME.equals(username) || anonUser.getName().equals(username)) ? true : false;
}
public User authenticate(Authentication authentication) throws AuthenticationFailedException {
if(UsernamePasswordAuthentication.class.isAssignableFrom(authentication.getClass())) {
UsernamePasswordAuthentication upAuth = (UsernamePasswordAuthentication) authentication;
if(TEST_USERNAME.equals(upAuth.getUsername()) && TEST_PASSWORD.equals(upAuth.getPassword())) {
return testUser;
}
if(anonUser.getName().equals(upAuth.getUsername())) {
return anonUser;
}
} else if(AnonymousAuthentication.class.isAssignableFrom(authentication.getClass())) {
return anonUser;
}
return null;
}
}
2016 年 11 月 15 日更新:关于 xml 配置的注意事项。
xml-elementinbound-channel-adapter
直接链接到FtpInboundFileSynchronizer
via org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser
via FtpNamespaceHandler
via spring-integration-ftp-4.3.5.RELEASE.jar!/META-INF/spring.handlers
。
按照xml-customFtpNamespaceHandler
参考指南,在本地文件中指定自定义META-INF/spring.handlers
应该允许您使用.FtpUpdatingFileSynchronizer
而不是FtpInboundFileSynchronizer
. 虽然它对我来说不适用于单元测试,但一个适当的解决方案可能涉及创建额外/修改的 xsd 文件,以便常规inbound-channel-adapter
使用常规FtpInboundFileSynchronizer
,而特殊inbound-updating-channel-adapter
使用FtpUpdatingFileSynchronizer
. 正确执行此操作有点超出此答案的范围。
不过,快速破解可以让您入门。FtpNamespaceHandler
您可以通过创建包org.springframework.integration.ftp.config
和类来覆盖默认值FtpNamespaceHandler
在您的本地项目中。内容如下图:
package org.springframework.integration.ftp.config;
public class FtpNamespaceHandler extends org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler {
@Override
public void init() {
System.out.println("Initializing FTP updating file synchronizer.");
// one updated line below, rest copied from original FtpNamespaceHandler
registerBeanDefinitionParser("inbound-channel-adapter", new MyFtpInboundChannelAdapterParser());
registerBeanDefinitionParser("inbound-streaming-channel-adapter",
new FtpStreamingInboundChannelAdapterParser());
registerBeanDefinitionParser("outbound-channel-adapter", new FtpOutboundChannelAdapterParser());
registerBeanDefinitionParser("outbound-gateway", new FtpOutboundGatewayParser());
}
}
package org.springframework.integration.ftp.config;
import org.springframework.integration.file.remote.synchronizer.InboundFileSynchronizer;
import org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser;
public class MyFtpInboundChannelAdapterParser extends FtpInboundChannelAdapterParser {
@Override
protected Class<? extends InboundFileSynchronizer> getInboundFileSynchronizerClass() {
System.out.println("Returning updating file synchronizer.");
return FtpUpdatingFileSynchronizer.class;
}
}
还要添加preserve-timestamp="true"
到 xml 文件中以防止新的IllegalArgumentException: for updating timestamps must be preserved
.