4

目前我正在研究 SFTP 协议。我已经创建了使用 Jsch 库的 SFTP 客户端和使用 Apache Mina Sshd 库的 SFTP 服务器。我已经在它们之间建立了连接,并且可以成功地将文件发送到 SFTP 服务器。现在我正在创建一个 SFTP 服务器处理传入文件的侧文件处理程序。例如,假设 SFTP 服务器可以从 SFTP 客户端接收文件,但目前在我的实现中,没有办法通知文件何时到达服务器。我只是去服务器根文件夹,看看是否有是可用的文件。这就是我知道文件是否到达的方式。

我想实现,当文件到达服务器时,它会通知用户文件到达和文件内容。(文件名和其他详细信息)。但问题是我是 Apache Mina sshd API 的新手。我已经阅读了文档但我想不通。

请我想知道是否有任何已经实现的侦听器来处理 Apache Mina Sshd 服务器中的传入文件,或者如果没有,我如何为传入文件实现自己的侦听器。

SFTP 服务器代码

public class SftpServerStarter {

    private SshServer sshd;
    private final static Logger logger = LoggerFactory.getLogger(SftpServerStarter.class);

    public void start(){


        sshd = SshServer.setUpDefaultServer();
        sshd.setPort(22);
        sshd.setHost("localhost");

        sshd.setPasswordAuthenticator(new MyPasswordAuthenticator());
        sshd.setPublickeyAuthenticator(new MyPublickeyAuthenticator());
        sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
        sshd.setSubsystemFactories(Arrays.<NamedFactory<Command>>asList(new SftpSubsystem.Factory()));
        sshd.setCommandFactory(new ScpCommandFactory());
        sshd.setFileSystemFactory(new VirtualFileSystemFactory("C:/root"));


        try {
            logger.info("Starting ...");
            sshd.start();
            logger.info("Started");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            logger.info("Can not Start Server");
        }
    }

}
4

3 回答 3

14

我开始使用@gihan 的建议,但在用户完成将文件上传到某些客户端之前,文件监视器处理文件时遇到了一些问题。

这是我在 Mina 的源代码中找到的解决方案。尽管 Apache Mina 网站上的文档稀疏、无用,但我认为这是他们打算让开发人员使用他们的库的方式。

注意:由于您的需求可能与我的不同,请记住,这可能不是复制粘贴解决方案。您可能需要修改此代码以满足您的需求,但我相当有信心此代码确实提供了您正在寻找的解决方案的关键。

第 1 步:实现 SftpEventListener

创建您自己的实现org.apache.sshd.server.subsystem.sftp.SftpEventListener. 这是我的一个例子。我的实现设置为在新上传或覆盖文件时运行一系列注册FileUploadCompleteListener方法,并阻止用户尝试导航或创建目录。

public class SFTPServiceSFTPEventListener implements SftpEventListener {

    Logger logger = Logger.getLogger(SFTPServiceSFTPEventListener.class);

    SFTPService service;

    public SFTPServiceSFTPEventListener(SFTPService service) {
        this.service = service;
    }

    public interface FileUploadCompleteListener {
        void onFileReady(File file);
    }

    private List<FileUploadCompleteListener> fileReadyListeners = new ArrayList<FileUploadCompleteListener>();

    public void addFileUploadCompleteListener(FileUploadCompleteListener listener) {
        fileReadyListeners.add(listener);
    }

    public void removeFileUploadCompleteListener(FileUploadCompleteListener listener) {
        fileReadyListeners.remove(listener);
    }

    @Override
    public void initialized(ServerSession serverSession, int version) {

    }

    @Override
    public void destroying(ServerSession serverSession) {

    }

    @Override
    public void open(ServerSession serverSession, String remoteHandle, Handle localHandle) {
        File openedFile = localHandle.getFile().toFile();
        if (openedFile.exists() && openedFile.isFile()) {
        }
    }

    @Override
    public void read(ServerSession serverSession, String remoteHandle, DirectoryHandle localHandle, Map<String,Path> entries) {

    }

    @Override
    public void read(ServerSession serverSession, String remoteHandle, FileHandle localHandle, long offset, byte[] data, int dataOffset, int dataLen, int readLen) {

    }

    @Override
    public void write(ServerSession serverSession, String remoteHandle, FileHandle localHandle, long offset, byte[] data, int dataOffset, int dataLen) {
    }

    @Override
    public void blocking(ServerSession serverSession,  String remoteHandle, FileHandle localHandle, long offset, long length, int mask) {
    }

    @Override
    public void blocked(ServerSession serverSession, String remoteHandle, FileHandle localHandle, long offset, long length, int mask, Throwable thrown) {
    }

    @Override
    public void unblocking(ServerSession serverSession, String remoteHandle, FileHandle localHandle, long offset, long length) {
    }

    @Override
    public void unblocked(ServerSession serverSession, String remoteHandle, FileHandle localHandle, long offset, long length, Boolean result, Throwable thrown) {
    }

    @Override
    public void close(ServerSession serverSession, String remoteHandle, Handle localHandle) {
        File closedFile = localHandle.getFile().toFile();
        if (closedFile.exists() && closedFile.isFile()) {
            logger.info(String.format("User %s closed file: \"%s\"", serverSession.getUsername(), localHandle.getFile().toAbsolutePath()));
            this.service.UserWroteFile(serverSession.getUsername(), localHandle.getFile());

            for (FileUploadCompleteListener fileReadyListener : fileReadyListeners) {
                fileReadyListener.onFileReady(closedFile);
            }
        }
    }

    @Override
    public void creating(ServerSession serverSession, Path path, Map<String,?> attrs) throws UnsupportedOperationException {
        logger.warn(String.format("Blocked user %s attempt to create a directory \"%s\"", serverSession.getUsername(), path.toString()));
        throw new UnsupportedOperationException("Creating sub-directories is not permitted.");
    }

    @Override
    public void created(ServerSession serverSession, Path path, Map<String,?> attrs, Throwable thrown) {
        String username = serverSession.getUsername();
        logger.info(String.format("User %s created: \"%s\"", username, path.toString()));
        service.UserWroteFile(username, path);
    }

    @Override
    public void moving(ServerSession serverSession, Path path, Path path1, Collection<CopyOption> collection) {

    }

    @Override
    public void moved(ServerSession serverSession, Path source, Path destination, Collection<CopyOption> collection, Throwable throwable) {
        String username = serverSession.getUsername();
        logger.info(String.format("User %s moved: \"%s\" to \"%s\"", username, source.toString(), destination.toString()));
        service.UserWroteFile(username, destination);
    }

    @Override
    public void removing(ServerSession serverSession, Path path) {

    }

    @Override
    public void removed(ServerSession serverSession, Path path, Throwable thrown) {

    }

    @Override
    public void linking(ServerSession serverSession, Path source, Path target, boolean symLink) throws UnsupportedOperationException {
        logger.warn(String.format("Blocked user %s attempt to create a link to \"%s\" at \"%s\"", serverSession.getUsername(), target.toString(), source.toString()));
        throw new UnsupportedOperationException("Creating links is not permitted");
    }

    @Override
    public void linked(ServerSession serverSession, Path source, Path target, boolean symLink, Throwable thrown) {

    }

    @Override
    public void modifyingAttributes(ServerSession serverSession, Path path, Map<String,?> attrs) {

    }

    @Override
    public void modifiedAttributes(ServerSession serverSession, Path path, Map<String,?> attrs, Throwable thrown) {
        String username = serverSession.getUsername();
        service.UserWroteFile(username, path);
    }
}

第 2 步:将侦听器的实例添加到服务器

一旦你实现了你的类,你需要做的就是实例化它并在你的服务器上SftpSubsystemFactory调用之前使用一个将它添加到你start()的服务器:

// Your SSHD Server
SshServer sshd = SshServer.setUpDefaultServer();

SftpSubsystemFactory sftpSubsystemFactory= new SftpSubsystemFactory();

// This is where to put your implementation of SftpEventListener
SFTPServiceSFTPEventListener sftpEventListener = new SFTPServiceSFTPEventListener(this);
sftpEventListener.addFileUploadCompleteListener(new SFTPServiceSFTPEventListener.FileUploadCompleteListener() {
    @Override
    public void onFileReady(File file) {
        try {
            doThingsWithFile(file);
        } catch (Exception e) {
            logger.warn(String.format("An error occurred while attempting to do things with the file: \"%s\"", file.getName()), e);
        }
    }
});
sftpSubsystemFactory.addSftpEventListener(sftpEventListener);

List<NamedFactory<Command>> namedFactoryList = new ArrayList<NamedFactory<Command>>();
namedFactoryList.add(sftpSubsystemFactory);
sshd.setSubsystemFactories(namedFactoryList);

// Do your other init stuff...

sshd.start();

完成此操作后,您的实现SftpEventListener将开始自动响应您已实现的事件。我的基本上只是在用户关闭文件时响应(在文件上传完成时发生),但正如我所说,您可以随意实现其他方法来响应其他事件

于 2017-08-04T18:49:54.157 回答
2

我终于找到了一个解决方案,但它不是来自 Apache Mina SSHD API。这里是概念:我们可以监视服务器的根目录的文件更改。如果服务器文件夹中的文件发生更改,则会触发事件。有很多 API 可以做到这一点。在我的代码片段中,我使用org.apache.commons.io.monitor.

SFTPFileListner 类

public static void startMonitor(String rootFolder) throws Exception {

        //every 5 seconds it will start monitoring
        final long pollingInterval = 5 * 1000;

        File folder = new File(rootFolder);

        if (!folder.exists()) {

            throw new RuntimeException("ERROR : Server root directory not found: " + rootFolder);
        }

        FileAlterationObserver observer = new FileAlterationObserver(folder);
        FileAlterationMonitor monitor = new FileAlterationMonitor(pollingInterval);
        FileAlterationListener listener = new FileAlterationListenerAdaptor() {

            @Override
            public void onFileCreate(File file) {
                try {

                    System.out.println("[SFTPFileListner] Received :"+ file.getName());
                    System.out.println("[SFTPFileListner] Received File Path :"+ file.getCanonicalPath());



                } catch (IOException e) {
                    throw new RuntimeException("ERROR: Unrecoverable error when creating files " + e.getMessage(),e);
                }
            }

        };

        observer.addListener(listener);
        monitor.addObserver(observer);
        monitor.start();
    }

创建监控类后,您可以在您的 SFTP 服务器类中调用已实现的方法。

SFTP 服务器类

//pass server root directory 
SFTPFileListner.startMonitor("C:/root");
于 2015-07-15T04:49:20.477 回答
-1

正如我在另一篇文章中提到的,Mina 没有直接让我们能够处理接收或部分接收传入文件时的触发器,但我们的需求非常具体。所以我们唯一的选择是走出米娜,你上面的解决方案就是这样做的。将其作为 Mina 的拉取请求/功能或进一步开发作为开源补充解决方案可能值得推动。我认为这是一个常见的问题,当某些东西进入他们的服务器时,人们将面临一个活跃的通知/触发系统。祝你在其余的开发中好运!

于 2015-07-15T10:54:11.260 回答