17

我正在尝试并行下载整个 ftp 目录。

#!/usr/bin/python
import sys
import datetime
import os
from multiprocessing import Process, Pool
from ftplib import FTP
curYear=""
remotePath =""
localPath = ""

def downloadFiles (remotePath,localPath):
        splitted = remotePath.split('/');
        host= splitted[2]
        path='/'+'/'.join(splitted[3:])
        ftp = FTP(host)
        ftp.login()
        ftp.cwd(path)
        filenames =  ftp.nlst()
        total=len(filenames)
        i=0
        pool = Pool()
        for filename in filenames:
                        local_filename = os.path.join(localPath,filename)
                        pool.apply_async(downloadFile, (filename,local_filename,ftp))
                        #downloadFile(filename,local_filename,ftp);
                        i=i+1

        pool.close()
        pool.join()
        ftp.close()

def downloadFile(filename,local_filename,ftp):
        file = open(local_filename, 'wb')
        ftp.retrbinary('RETR '+ filename, file.write)
        file.close()

def getYearFromArgs():
        if len(sys.argv) >= 2 and sys.argv[1] == "Y":
                year = sys.argv[2]
                del sys.argv[1:2]
        else:
                year = str(datetime.datetime.now().year)
        return year

def assignGlobals():
        global p
        global remotePath
        global localPath
        global URL
        global host
        global user
        global password
        global sqldb
        remotePath = 'ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/isd-lite/%s/' % (curYear)
        localPath = '/home/isd-lite/%s/' % (curYear)

def main():
        global curYear
        curYear=getYearFromArgs()
        assignGlobals()
        downloadFiles(remotePath,localPath)

if __name__ == "__main__":
        main()

但我得到了这个例外:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
TypeError: expected string or Unicode object, NoneType found

如果我注释掉这一行:

pool.apply_async(downloadFile, (filename,local_filename,ftp)

并删除此行的注释:

downloadFile(filename,local_filename,ftp);

然后它工作得很好,但它很慢而且不是多线程的。

4

2 回答 2

23

2014 年 5 月 9 日更新:

我已经确定了确切的限制。只要对象可以被Python 的 pickle 工具腌制,就可以跨进程边界将对象发送到工作进程。我在原始答案中描述的问题发生是因为我试图向工作人员发送文件句柄。一个快速的实验说明了为什么这不起作用:

>>> f = open("/dev/null")
>>> import pickle
>>> pickle.dumps(f)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle file objects

因此,如果您遇到导致您找到此 Stack Overflow 问题的 Python 错误,请确保您跨进程边界发送的所有内容都可以腌制。

原答案:

我回答有点晚了。但是,在尝试使用 Python 的多处理模块时,我遇到了与原始海报相同的错误消息。我将记录我的发现,以便任何偶然发现此线程的人都可以尝试。

在我的情况下,发生错误是因为我试图发送到工作人员池:我试图传递一个文件对象数组供池工作人员咀嚼。这显然太多了,无法在 Python 中跨进程边界发送。我通过发送指定输入和输出文件名字符串的池工作者字典解决了这个问题。

因此,您提供给函数的可迭代对象(例如apply_async(I used map()and imap_unordered()))似乎可以包含数字或字符串列表,甚至是详细的字典数据结构(只要值不是对象)。

在你的情况下:

pool.apply_async(downloadFile, (filename,local_filename,ftp))

ftp是一个可能导致问题的对象。作为一种解决方法,我建议将参数发送给工作人员(在这种情况下看起来像hostpath),并让工作人员实例化对象并处理清理工作。

于 2013-05-25T16:05:14.373 回答
-1

你有没有尝试过:

pool.apply_async(downloadFile, args=(filename,local_filename,ftp))

原型是:

apply_async(func, args=(), kwds={}, callback=None)
于 2013-01-10T06:28:44.590 回答