我有一个我认为是相当简单的问题,但我似乎无法让它像我希望的那样工作。我有 ~200 ~200 MB 文件,足以让我无法一次将它们全部加载到内存中。每个文件需要与其他文件一起处理一次(因此处理函数的操作约为 20000 次)。我需要多次循环文件,例如:
for i in xrange(len(file_list)-1): #go through each file
if i==0:
f = read(file_list[i]) #load in the first file
else:
f = g0 #we've already loaded in the next file once, don't re-load it
for j in xrange(i+1,len(file_list)): #go through each other file to get all unique pairs
g = read(file_list[i+j+1]) # takes 5 s
answer.append(processing_function(f,g)) # takes 10s
if j==0:
g = g0 # this will be "f" for the next iteration of the outer loop
外循环只需要加载文件 1,然后它可以从内循环的第一次迭代中获取一个已经加载的文件作为它的下一个值。内部循环需要加载 (len(file_list)-i) 文件并处理它们。由于我无法在内存中保留约 200 个文件,并且没有简单的方法来分解文件列表(因为每个文件都需要彼此配对一次)
是否有一种明显的方法可以并行化 read() 函数,使其至少读取下一个文件,即在 processing_function(f_i,g_j) 执行期间 g_j+1?我已经尝试过线程和多处理,并且我已经能够将读取卸载到不同的线程/进程中,但它永远不会并行运行(即 processing_function() 的执行在我得到 ag 后立即开始,而未来的 g_j 正在后台加载)。我要么设法挂起程序(使用多处理),要么只是在 processing_function() 在第一个文件上运行之前将一堆文件加载到队列中(使用线程)。
我希望这很容易执行,我只是弄乱了一个细节。
这是我迄今为止尝试过的(来自有人在其他地方建议的代码) -看起来这应该做我想要的,但就像我说的那样,QueuedFileReader() 中的 file_worker 方法似乎通过并达到 QUEUE_SIZE在将控制权交还给 QueuedFileReader 作为生成器的循环之前,将文件放入队列中。如果 file_worker 仅自行运行并且使用 QueuedFileReader() 的循环可以在队列中准备好用于 next() 调用的单个文件时继续,那将是完美的...
class QueuedFileReader():
def __init__(self,file_list,tmp_dir,QUEUE_SIZE):
self.queue =Queue.Queue(QUEUE_SIZE)
self.worker = threading.Thread(target=QueuedFileReader.file_worker,args=(self.queue,file_list,tmp_dir))
#self.queue = multiprocessing.Queue(QUEUE_SIZE)
#self.worker = multiprocessing.Process(target=QueuedFileReader.file_worker,args=(self.queue,file_list,tmp_dir))
self.worker.daemon=True
self.worker.start()
@staticmethod
def file_worker(queue,file_list,tmp_dir):
for i,f in enumerate(file_list):
done = False
while True and not done:
try:
print "attempting to read %s"%f_in
queue.put((i,f_in,files.read(f)))
print "successfully read in %s"%f
done = True
except Queue.Full:
pass
queue.put('done')
def __iter__(self):
return self
def next(self):
N = 0
while True:
try:
x = self.queue.get()
break
except Queue.Empty:
pass
if x == 'done': raise StopIteration
return x
def __del__(self):
self.worker.join()
这被称为:
for j,fnm,g in QueuedFileReader(flz[i+1:],tmp_dir,QUEUE_SIZE):
#the code that runs on object "f" from the outer loop and "g" that should be supplied off the top of the queue on each next() call to QueuedFileReader()