1

我正在尝试使用此处描述的代理方法跨多个处理共享现有对象。我的多处理习语是工人/队列设置,以此处的第四个示例为模型。

代码需要对存储在磁盘上相当大的文件中的数据进行一些计算。我有一个封装所有 I/O 交互的类,一旦它从磁盘读取文件,它就会将数据保存在内存中,以备下次任务需要使用相同的数据时使用(这种情况经常发生)。

我以为我通过阅读上面链接的示例可以完成所有工作。这是一个仅使用 numpy 随机数组对磁盘 I/O 建模的代码模型:

import numpy
from multiprocessing import Process, Queue, current_process, Lock
from multiprocessing.managers import BaseManager

nfiles = 200
njobs = 1000

class BigFiles:

    def __init__(self, nfiles):
        # Start out with nothing read in.
        self.data = [ None for i in range(nfiles) ]
        # Use a lock to make sure only one process is reading from disk at a time.
        self.lock = Lock()

    def access(self, i):
        # Get the data for a particular file
        # In my real application, this function reads in files from disk.
        # Here I mock it up with random numpy arrays.
        if self.data[i] is None:
            with self.lock:
                self.data[i] = numpy.random.rand(1024,1024)
        return self.data[i]

    def summary(self):
        return 'BigFiles: %d, %d Storing %d of %d files in memory'%(
                id(self),id(self.data),
                (len(self.data) - self.data.count(None)),
                len(self.data)  )


# I'm using a worker/queue setup for the multprocessing:
def worker(input, output):
    proc = current_process().name
    for job in iter(input.get, 'STOP'):
        (big_files, i, ifile) = job
        data = big_files.access(ifile)
        # Do some calculations on the data
        answer = numpy.var(data)
        msg = '%s, job %d'%(proc, i)
        msg += '\n   Answer for file %d = %f'%(ifile, answer)
        msg += '\n   ' + big_files.summary()
        output.put(msg)

# A class that returns an existing file when called.
# This is my attempted workaround for the fact that Manager.register needs a callable.
class ObjectGetter:
    def __init__(self, obj):
        self.obj = obj
    def __call__(self):
        return self.obj

def main():
    # Prior to the place where I want to do the multprocessing, 
    # I already have a BigFiles object, which might have some data already read in.
    # (Here I start it out empty.)
    big_files = BigFiles(nfiles)
    print 'Initial big_files.summary = ',big_files.summary()

    # My attempt at making a proxy class to pass big_files to the workers
    class BigFileManager(BaseManager): 
        pass
    getter = ObjectGetter(big_files)
    BigFileManager.register('big_files', callable = getter)
    manager = BigFileManager()
    manager.start()

    # Set up the jobs:
    task_queue = Queue()
    for i in range(njobs):
        ifile = numpy.random.randint(0, nfiles)
        big_files_proxy = manager.big_files()
        task_queue.put( (big_files_proxy, i, ifile) )

    # Set up the workers
    nproc = 12
    done_queue = Queue()
    process_list = []
    for j in range(nproc):
        p = Process(target=worker, args=(task_queue, done_queue))
        p.start()
        process_list.append(p)
        task_queue.put('STOP')

    # Log the results
    for i in range(njobs):
        msg = done_queue.get()
        print msg

    print 'Finished all jobs'
    print 'big_files.summary = ',big_files.summary()

    # Shut down the workers
    for j in range(nproc):
        process_list[j].join()
    task_queue.close()
    done_queue.close()

main()

从某种意义上说,它可以正确计算所有内容,并且正在缓存沿途读取的数据。我遇到的唯一问题是最后, big_files 对象没有加载任何文件。返回的最终味精是:

Process-2, job 999.  Answer for file 198 = 0.083406
   BigFiles: 4303246400, 4314056248 Storing 198 of 200 files in memory

但是在这一切完成之后,我们有:

Finished all jobs
big_files.summary =  BigFiles: 4303246400, 4314056248 Storing 0 of 200 files in memory

所以我的问题是:所有存储的数据发生了什么?它声称根据 id(self.data) 使用相同的 self.data。但是现在是空的。

我希望 big_files 的最终状态包含它在此过程中积累的所有已保存数据,因为我实际上必须多次重复整个过程,所以我不想重做所有(慢)I/O每一次。

我假设它一定与我的 ObjectGetter 类有关。使用 BaseManager 的示例仅显示如何创建将共享的新对象,而不是共享现有对象。那么我在获取现有 big_files 对象的方式上做错了吗?任何人都可以提出更好的方法来执行此步骤吗?

非常感谢!

4

0 回答 0