我有一个非常大的迭代器,它返回大量数据(文件内容)。因此,使用迭代器有效地在几秒钟内耗尽了我所有的 RAM。通常,pythons multiprocessing.Pool().imap(...) 声称会进行惰性迭代。这意味着它从迭代器中获取一个值,将其传递给 worker,然后等待 worker 完成。这正是我想要的。
但是,由于某种原因,它会继续从迭代器中检索值,即使最大数量的工作人员已经在运行。这是我的代码:
class FileNameIterator(object): # Support class for TupleIterator
def __init__(self,path):
self.scanner = scandir.scandir(path)
def __iter__(self):
return self
def next(self):
while True:
direntry = self.scanner.next()
if os.path.isfile(direntry.path):
return direntry.name
class ContentIterator(object): # Support class for TupleIterator
def __init__(self,filenameiter,path):
self.filenameiter = filenameiter
self.path = path
def __iter__(self):
return self
def next(self):
print "<iter> reading content." # TODO: remove
with open(self.path + "\\" + self.filenameiter.next()) as f:
r = f.read()
f.close()
return r
class TupleIterator(object): # Basically izip with working __len__
def __init__(self,path):
self.fnameiter = FileNameIterator(path)
self.cntiter = ContentIterator(FileNameIterator(path),path)
self.path = path
def __iter__(self):
return self
def next(self):
return self.fnameiter.next(), self.cntiter.next()
def __len__(self):
return len([name for name in os.listdir(self.path) if os.path.isfile(os.path.join(self.path, name))])
pool = ThreadPool(12) # Has same API as Pool(), but allows memory sharing
itr = TupleIterator(_path) # Basically izip with working __len__
with open(_datafile, 'w') as datafile: # Workers write results to a file
pool.imap_unordered(worker, itr,len(itr)/12) # This does not work as expected
pool.close()
pool.join()
datafile.close()
我让工人在开始和结束时打印一条消息,而迭代器在读取文件时打印。这表明迭代器持续读取文件的速度比工作人员处理它们的速度要快得多。
我该如何解决?imap(..) 函数是否正常工作,我只是误解了它应该如何工作?