我正在尝试使用此处描述的代理方法跨多个处理共享现有对象。我的多处理习语是工人/队列设置,以此处的第四个示例为模型。
代码需要对存储在磁盘上相当大的文件中的数据进行一些计算。我有一个封装所有 I/O 交互的类,一旦它从磁盘读取文件,它就会将数据保存在内存中,以备下次任务需要使用相同的数据时使用(这种情况经常发生)。
我以为我通过阅读上面链接的示例可以完成所有工作。这是一个仅使用 numpy 随机数组对磁盘 I/O 建模的代码模型:
import numpy
from multiprocessing import Process, Queue, current_process, Lock
from multiprocessing.managers import BaseManager
nfiles = 200
njobs = 1000
class BigFiles:
def __init__(self, nfiles):
# Start out with nothing read in.
self.data = [ None for i in range(nfiles) ]
# Use a lock to make sure only one process is reading from disk at a time.
self.lock = Lock()
def access(self, i):
# Get the data for a particular file
# In my real application, this function reads in files from disk.
# Here I mock it up with random numpy arrays.
if self.data[i] is None:
with self.lock:
self.data[i] = numpy.random.rand(1024,1024)
return self.data[i]
def summary(self):
return 'BigFiles: %d, %d Storing %d of %d files in memory'%(
id(self),id(self.data),
(len(self.data) - self.data.count(None)),
len(self.data) )
# I'm using a worker/queue setup for the multprocessing:
def worker(input, output):
proc = current_process().name
for job in iter(input.get, 'STOP'):
(big_files, i, ifile) = job
data = big_files.access(ifile)
# Do some calculations on the data
answer = numpy.var(data)
msg = '%s, job %d'%(proc, i)
msg += '\n Answer for file %d = %f'%(ifile, answer)
msg += '\n ' + big_files.summary()
output.put(msg)
# A class that returns an existing file when called.
# This is my attempted workaround for the fact that Manager.register needs a callable.
class ObjectGetter:
def __init__(self, obj):
self.obj = obj
def __call__(self):
return self.obj
def main():
# Prior to the place where I want to do the multprocessing,
# I already have a BigFiles object, which might have some data already read in.
# (Here I start it out empty.)
big_files = BigFiles(nfiles)
print 'Initial big_files.summary = ',big_files.summary()
# My attempt at making a proxy class to pass big_files to the workers
class BigFileManager(BaseManager):
pass
getter = ObjectGetter(big_files)
BigFileManager.register('big_files', callable = getter)
manager = BigFileManager()
manager.start()
# Set up the jobs:
task_queue = Queue()
for i in range(njobs):
ifile = numpy.random.randint(0, nfiles)
big_files_proxy = manager.big_files()
task_queue.put( (big_files_proxy, i, ifile) )
# Set up the workers
nproc = 12
done_queue = Queue()
process_list = []
for j in range(nproc):
p = Process(target=worker, args=(task_queue, done_queue))
p.start()
process_list.append(p)
task_queue.put('STOP')
# Log the results
for i in range(njobs):
msg = done_queue.get()
print msg
print 'Finished all jobs'
print 'big_files.summary = ',big_files.summary()
# Shut down the workers
for j in range(nproc):
process_list[j].join()
task_queue.close()
done_queue.close()
main()
从某种意义上说,它可以正确计算所有内容,并且正在缓存沿途读取的数据。我遇到的唯一问题是最后, big_files 对象没有加载任何文件。返回的最终味精是:
Process-2, job 999. Answer for file 198 = 0.083406
BigFiles: 4303246400, 4314056248 Storing 198 of 200 files in memory
但是在这一切完成之后,我们有:
Finished all jobs
big_files.summary = BigFiles: 4303246400, 4314056248 Storing 0 of 200 files in memory
所以我的问题是:所有存储的数据发生了什么?它声称根据 id(self.data) 使用相同的 self.data。但是现在是空的。
我希望 big_files 的最终状态包含它在此过程中积累的所有已保存数据,因为我实际上必须多次重复整个过程,所以我不想重做所有(慢)I/O每一次。
我假设它一定与我的 ObjectGetter 类有关。使用 BaseManager 的示例仅显示如何创建将共享的新对象,而不是共享现有对象。那么我在获取现有 big_files 对象的方式上做错了吗?任何人都可以提出更好的方法来执行此步骤吗?
非常感谢!