19
def download():
if os.path.exists( dst_dir_path ) == False:
    logger.error( "Cannot access destination folder %s. Please check path and permissions. " % ( dst_dir_path ))
    return 1
elif os.path.isdir( dst_dir_path ) == False:
    logger.error( "%s is not a folder. Please check path. " % ( dst_dir_path ))
    return 1

file_list = None
#transport = paramiko.Transport(( hostname, port)) 
paramiko.util.log_to_file('paramiko.log')
ssh = paramiko.SSHClient() 
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
#transport
try:
    ssh.connect( hostname, username=username, password=password, timeout=5.0) 
    #transport.connect(username=username, password=password ) 
except Exception, err:
    logger.error( "Failed to connect to the remote server. Reason: %s" % ( str(err) ) )
    return 1

try:
    #sftp = paramiko.SFTPClient.from_transport(transport)
    sftp = ssh.open_sftp() 
except Exception, err:
    logger.error( "Failed to start SFTP session from connection to %s. Check that SFTP service is running and available. Reason: %s" % ( hostname, str(err) ))
    return 1

try:    
    sftp.chdir(src_dir_path)
    #file_list = sftp.listdir(path="%s" % ( src_dir_path ) )
    file_list = sftp.listdir()

except Exception, err:
    logger.error( "Failed to list files in folder %s. Please check path and permissions. Reason: %s" % ( src_dir_path, str(err) ))
    return 1
match_text = re.compile( file_mask )
download_count = 0
for file in file_list:         
    # Here is an item name... but is it a file or directory?         
    #logger.info( "Downloading file %s." % ( file ) )
    if not re.match( file_mask, file ):
        continue
    else:
        logger.info( "File \"%s\" name matched file mask \"%s\". matches %s.Processing file..." % ( file, file_mask, (match_text.match( file_mask ) ) ) )
    src_file_path = "./%s" % ( file )
    dst_file_path = "/".join( [ dst_dir_path, file]   )
    retry_count = 0
    while True:
        try:
            logger.info( "Downloading file %s to %s."  % ( file, dst_file_path ) )
            #sftp.get( file, dst_file_path, callback=printTotals ) #sftp.get( remote file, local file )
            sftp.get( file, dst_file_path) #sftp.get( remote file, local file )
            logger.info( "Successfully downloaded file %s to %s."  % ( file, dst_file_path ) )
            download_count += 1
            break
        except Exception, err:
            if retry_count == retry_threshold:
                logger.error( "Failed to download %s to %s. Reason: %s." % ( file, dst_file_path, str(err) ) )
                sftp.close() 
                #transport.close()
                return 1
            else:
                logger.error( "Failed to download %s to %s. Reason: %s." % ( file, dst_file_path, str(err) ) )
                retry_count +=1

sftp.close() 
transport.close() 
logger.info( "%d files downloaded." % ( download_count ) )
return 0

当我运行以下函数时,它会下载源文件大约 3 分钟,然后关闭会话,即使只下载了 1-1.6GB 文件的 38-41MB(不定)。

从 Paramiko 日志文件中,看起来 SSh 连接在 SFTP 会话关闭时保持打开状态:

hmac-sha1', 'hmac-sha1-96', 'hmac-md5', 'hmac-md5-96', 'umac-64@openssh.com'] 服务器mac:['hmac-sha1', 'hmac- sha1-96', 'hmac-md5', 'hmac-md5-96', 'umac-64@openssh.com'] 客户端压缩:['zlib@openssh.com', 'zlib', 'none'] 服务器compress:['zlib@openssh.com', 'zlib', 'none'] 客户端 lang:[''] 服务器 lang:[''] kex 跟随?假 DEB [20120913-10:05:07.421] thr=1 paramiko.transport:密码同意:local=aes128-ctr,remote=aes128-ctr DEB [20120913-10:05:07.421] thr=1 paramiko.transport:使用 kex diffie-hellman-group1-sha1;服务器密钥类型 ssh-dss;密码:本地 aes128-ctr,远程 aes128-ctr;mac:本地 hmac-sha1,远程 hmac-sha1;压缩:本地无,远程无 DEB [20120913-10:05:07。625] thr=1 paramiko.transport: 切换到新密钥... INF [20120913-10:05:10.374] thr=2 paramiko.transport.sftp: [chan 1] sftp 会话关闭。DEB [20120913-10:05:10.388] thr=2 paramiko.transport: [chan 1] EOF 发送 (1)

此后,脚本退出并出现此异常(来自 sftp.get() try/except 块)

资源不足,无法完成请求

系统本身有千兆字节的可用磁盘空间,所以这不是问题。

parakmiko 失败的相同传输在 FileZilla 和我几年前编写的用于 SFTP 传输的 Java 应用程序中运行良好。所以我认为这是 paramiko 的问题。

它在 Windows XP 和 Windows Server 2003 上运行。

我已经尝试修补 Paramko 1.17 以便它更频繁地刷新密钥,但传输仍然会引发异常。Python 2.7.3 Paramiko 1.7 与补丁 Windows 2003 Sevfer

想法?

附加信息:它在 Windows XP SP3 和 Windows 2003 服务器上失败,完全相同的行为和错误消息。sys.version 信息 Window XP Workstation: '2.7.3 (default, Apr 10 2012, 23:31:26) [MSC v.1500 32 bit (Intel)]' Windows 2003 Server: '2.7.3 (default, Apr 10 2012, 23:31:26) [MSC v.1500 32 bit (Intel)]' 我修补了 packet.py 文件以减少密钥更新之间的时间。它对 sftp.get() 的行为没有影响。

4

7 回答 7

16

SFTP 协议没有办法流式传输文件数据;相反,它所拥有的是一种从打开文件中的特定偏移量请求数据块的方法。下载文件的简单方法是请求第一个块,将其写入磁盘,然后请求第二个块,依此类推。这是可靠的,但非常慢。

相反,Paramiko 使用了一个性能技巧:当您调用它时,它会立即为文件中的每个.get()块发送请求,并且它会记住它们应该写入的偏移量。然后,当每个响应到达时,它会确保将其写入磁盘上正确的偏移量。有关详细信息,请参阅 Paramiko 文档中的和方法。我怀疑它在下载 1GB 文件时存储的簿记信息可能会导致......资源耗尽,生成“资源不足”消息。SFTPFile.prefetch()SFTPFile.readv()

.get()如果您只是调用.open()以获取SFTPFile实例,则不要使用,然后调用.read()该对象,或者将其交给 Python 标准库函数shutil.copyfileobj()以下载内容。这应该避免 Paramiko 预取缓存,并允许您下载文件,即使它不是那么快。

IE:

 def lazy_loading_ftp_file(sftp_host_conn, filename):
    """
        Lazy loading ftp file when exception simple sftp.get call
        :param sftp_host_conn: sftp host
        :param filename: filename to be downloaded
        :return: None, file will be downloaded current directory
    """
    import shutil
    try:
        with sftp_host_conn() as host:
            sftp_file_instance = host.open(filename, 'r')
            with open(filename, 'wb') as out_file:
                shutil.copyfileobj(sftp_file_instance, out_file)
            return {"status": "sucess", "msg": "sucessfully downloaded file: {}".format(filename)}
    except Exception as ex:
        return {"status": "failed", "msg": "Exception in Lazy reading too: {}".format(ex)}
于 2013-01-08T07:36:25.587 回答
14

我有一个非常相似的问题,在我的情况下,文件只有 ~400 MB,但在下载大约 35 MB 左右后它会一直失败。它并不总是在下载完全相同的字节数时失败,但在大约 35 - 40 MB 的某个地方,文件会停止传输,大约一分钟后,我会收到“资源不足,无法完成请求”错误。

通过 WinSCP 或 PSFTP 下载文件工作正常。

我尝试了Screwtape 的方法,它确实有效,但速度非常慢。我的 400 MB 文件要花 4 个小时才能下载,这对于这个特定的应用程序来说是一个不可接受的时间范围。

此外,有一次,当我们第一次进行设置时,一切正常。但是服务器管理员对 SFTP 服务器进行了一些更改,这就是事情发生的时候。我不确定更改是什么,但由于使用 WinSCP/其他 SFTP 方法仍然可以正常工作,我认为尝试从服务器端攻击它不会有成果。

我不会假装理解为什么,但最终对我有用的是:

  1. 我下载并安装了当前版本的 Paramiko(此时为 1.11.1)。最初这根本没有任何区别,但我想我会提到它以防万一它是解决方案的一部分。

  2. 异常的堆栈跟踪是:

    File "C:\Python26\lib\site-packages\paramiko\sftp_client.py", line 676, in get
        size = self.getfo(remotepath, fl, callback)
    File "C:\Python26\lib\site-packages\paramiko\sftp_client.py", line 645, in getfo
        data = fr.read(32768)
    File "C:\Python26\lib\site-packages\paramiko\file.py", line 153, in read
        new_data = self._read(read_size)
    File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 157, in _read
        data = self._read_prefetch(size)
    File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 138, in _read_prefetch
        self._check_exception()
    File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 483, in _check_exception
        raise x
    
  3. 在 sftp_file.py 中查看了一下,我注意到了这一点(当前版本中的第 43-45 行):

    # Some sftp servers will choke if you send read/write requests larger than
    # this size.
    MAX_REQUEST_SIZE = 32768
    
  4. 一时兴起,我尝试将 MAX_REQUEST_SIZE 更改为 1024,你瞧,我能够下载整个文件!

  5. 在通过将 MAX_REQUEST_SIZE 更改为 1024 使其工作后,我尝试了 1024 到 32768 之间的其他值,以查看它是否影响性能或其他任何东西。但是当值明显大于 1024(1025 可以,但 1048 最终失败)时,我总是迟早会得到错误。

于 2013-09-23T21:37:55.797 回答
13

我在使用 pysftp 通过 SFTP 下载大文件(>1 GB)时遇到了问题。底层库是 Paramiko。谷歌搜索问题将我带到这里,并且有很好的解决方案。尽管如此,许多帖子都相对较旧,我想这些问题中的大部分已经随着时间的推移而得到解决。它对我的问题没有帮助。

即:Paramiko 在 sftp_file.py 预取期间加载块时遇到内存错误。该列表超出了限制,并且内存错误以某种方式没有阻止执行。它可能在堆栈上以某种方式默默地消耗掉了。仅当发生此错误时下载才会失败,并且它们在单独的线程中运行。

无论如何,控制列表大小的方法是设置 MAX_REQUEST_SIZE:

paramiko.sftp_file.SFTPFile.MAX_REQUEST_SIZE = pow(2, 22) # 4MB per chunk

如果超过 16MB,你会遇到一个新问题:paramiko.sftp.SFTPError: Garbage packet received。原来在 _read_packet 方法中检查了 sftp.py:

# most sftp servers won't accept packets larger than about 32k, so
# anything with the high byte set (> 16MB) is just garbage.
if byte_ord(x[0]):
    raise SFTPError("Garbage packet received")

因此,如果一个块 > 16MB,我们就会引发此错误。我不想摆弄 Paramiko 库本身,所以我不得不将我的块大小保持在 4MB 的“可接受的最大值”。

这样我就能够下载大小> 30GB的文件。希望这对人们有所帮助。

于 2019-10-11T10:47:23.630 回答
5

除了Screwtape的回答之外,还值得一提的是,您可能应该限制块大小.read([block size in bytes])

阅读大文件的懒惰方法

file.read()我在 2.4 中没有块大小大小时遇到​​了真正的问题,但 2.7 可能确定正确的块大小。

于 2013-03-07T12:00:41.727 回答
5

我将这种类型的脚本与 paramiko 一起用于较大的文件,您可以使用window_size/packet size来查看最适合您的方式,如果您希望它具有更高的性能,您可以运行并行进程以使用并行读取不同的文件块第二种方法(见http://docs.paramiko.org/en/latest/api/sftp.html#paramiko.sftp_file.SFTPFile.readv

import time, paramiko

MAX_RETRIES = 10

ftp_server = "ftp.someserver.com"
port = 22
sftp_file = "/somefolder/somefile.txt"
local_file = "/somefolder/somewhere/here.txt"
ssh_conn = sftp_client = None
username = "username"
password = "password"

start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        # method 1 using sftpfile.get and settings window_size, max_packet_size
        window_size = pow(4, 12)#about ~16MB chunks
        max_packet_size = pow(4, 12)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn, window_size=window_size, max_packet_size=max_packet_size)
        sftp_client.get(sftp_file, local_file)
        # method 2 breaking up file into chunks to read in parallel
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        filesize = sftp_client.stat(sftp_file).st_size
        chunksize = pow(4, 12)#<-- adjust this and benchmark speed
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        with sftp_client.open(sftp_file, "rb") as infile:
            with open(local_file, "wb") as outfile:
                for chunk in infile.readv(chunks):
                    outfile.write(chunk)
        break
    except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry) * 10)
        # back off in steps of 10, 20.. seconds 
    finally:
        if hasattr(sftp_client, "close") and callable(sftp_client.close):
            sftp_client.close()
        if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
            ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))

如果您真的关心性能,您可以运行第二种方法并将文件分成多个进程/线程,这是一个使用多线程的代码示例,它写入多个文件部分,然后将它们连接到一个文件中

import threading, os, time, paramiko

#you could make the number of threads relative to file size
NUM_THREADS = 4
MAX_RETRIES = 10

def make_filepart_path(file_path, part_number):
    """creates filepart path from filepath"""
    return "%s.filepart.%s" % (file_path, part_number+1)

def write_chunks(chunks, tnum, local_file_part, username, password, ftp_server, max_retries):
    ssh_conn = sftp_client = None
    for retry in range(max_retries):
        try:
            ssh_conn = paramiko.Transport((ftp_server, port))
            ssh_conn.connect(username=username, password=password)
            sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
            with sftp_client.open(sftp_file, "rb") as infile:
                with open(local_file_part, "wb") as outfile:
                    for chunk in infile.readv(chunks):
                        outfile.write(chunk)
            break
        except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
            retry += 1
            print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
            time.sleep(abs(retry) * 10)
        finally:
            if hasattr(sftp_client, "close") and callable(sftp_client.close):
                sftp_client.close()
            if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
                ssh_conn.close()



start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        # connect to get the file's size in order to calculate chunks
        filesize = sftp_client.stat(sftp_file).st_size
        sftp_client.close()
        ssh_conn.close()
        chunksize = pow(4, 12)
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
        # break the chunks into sub lists to hand off to threads
        thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
        threads = []
        fileparts = []
        for thread_num in range(len(thread_chunks)):
            local_file_part = make_filepart_path(local_file, thread_num) 
            args = (thread_chunks[thread_num], thread_num, local_file_part, username, password, ftp_server, MAX_RETRIES)
            threads.append(threading.Thread(target=write_chunks, args=args))
            fileparts.append(local_file_part)
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
        # join file parts into one file, remove fileparts
        with open(local_file, "wb") as outfile:
            for filepart in fileparts:
                with open(filepart, "rb") as infile:
                    outfile.write(infile.read())
                os.remove(filepart)
        break
    except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry) * 10)
    finally:
       if hasattr(sftp_client, "close") and callable(sftp_client.close):
           sftp_client.close()
       if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
           ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))
于 2019-06-25T11:25:35.990 回答
3

我尝试将代码跟踪到 paramiko,现在我确定这是服务器问题。

1. prefetch做了什么

为了提高下载速度,paramiko 尝试通过 fetch 方法预取文件。当SFTP_FILE.prefetch()方法被调用时,会创建一个新线程,并将吨 fetch 请求发送到服务器 util 覆盖整个文件。
我们可以在文件 paramiko/sftp_file.py 的第 464 行附近找到它。

2.如何确定是服务器问题

上面提到的请求是在异步模式下运行的。SFTP_FILE._async_response()用于接收来自服务器的异步响应。跟踪代码,我们可以发现这个异常是在SFTP_FILE._async_response()从服务器发送的消息转换的方法中创建的。现在,我们可以确定这是来自服务器的异常。

3.如何解决问题

因为我无法访问服务器,所以在命令行中使用 sftp 是我最好的选择。但是另一方面,现在我们知道请求过多会使服务器崩溃,所以我们可以在向服务器发送请求时休眠.

于 2017-02-15T12:14:24.373 回答
-1

我也遇到了类似的问题。

Traceback (most recent call last):
File "---", line 948, in <module>
main()
File "---", line 937, in main
args.sshProxyKeyfile)
File "---", line 904, in Bootstrap
CopyFiles(client, builds, k8sVer)
File "---", line 393, in CopyWcpFilesToVC
ScpWithClient(client, __file__, __file__)
File "---", line 621, in ScpWithClient
with client.open_sftp() as sftp:
File "---_env.Linux/lib64/python3.5/site-packages/paramiko/client.py", line 556, in open_sftp
return self._transport.open_sftp_client()
File "---_env.Linux/lib64/python3.5/site-packages/paramiko/transport.py", line 1097, in open_sftp_client
return SFTPClient.from_transport(self)
File "---_env.Linux/lib64/python3.5/site-packages/paramiko/sftp_client.py", line 170, in from_transport
return cls(chan)
File "---_env.Linux/lib64/python3.5/site-packages/paramiko/sftp_client.py", line 130, in __init__
server_version = self._send_version()
File "---_env.Linux/lib64/python3.5/site-packages/paramiko/sftp.py", line 134, in _send_version
t, data = self._read_packet()
File "---_env.Linux/lib64/python3.5/site-packages/paramiko/sftp.py", line 205, in _read_packet
raise SFTPError("Garbage packet received")
paramiko.sftp.SFTPError: Garbage packet received

原因是 bash 不是与用户登录关联的默认 shell。使用永久更改用户的默认外壳chsh -s /bin/bash <user>解决了该问题。

于 2020-12-10T06:12:37.960 回答