0

我有一个查找表LUT,它是一个非常大的字典(24G)。我有数百万个输入来对其执行查询。

我想将数百万个输入拆分为 32 个作业,并并行运行它们。由于空间限制,我无法运行多个 python 脚本,因为这会导致内存过载。

我想使用该multiprocessing模块只加载LUT一次,然后让不同的进程查找它,同时将它作为全局变量共享,而不必复制它。

但是,当我查看 时htop,似乎每个子进程都在重新创建LUT? 我提出这个要求是因为在VIRT, RES, SHR. 数字非常高。但与此同时,我没有看到该Mem行中使用的额外内存,它从 11Gb 增加到 12.3G 并且只是悬停在那里。

所以我很困惑,是它,还是不是LUT在每个子流程中重新创建?我应该如何继续确保我正在运行并行工作,而不在每个子进程中复制 LUT?代码如下图所示。

在此处输入图像描述 (在这个实验中,我只使用了 1Gb 的 LUT,所以不用担心它不是 24Gb)

import os, sys, time, pprint, pdb, datetime
import threading, multiprocessing

## Print the process/thread details
def getDetails(idx):
    pid = os.getpid()
    threadName = threading.current_thread().name
    processName = multiprocessing.current_process().name
    print(f"{idx})\tpid={pid}\tprocessName={processName}\tthreadName={threadName} ")
    return pid, threadName, processName

def ComplexAlgorithm(value):
    # Instead of just lookup like this
    # the real algorithm is some complex algorithm that performs some search
    return value in LUT

## Querying the 24Gb LUT from my millions of lines of input
def PerformMatching(idx, NumberOfLines):
    pid, threadName, processName = getDetails(idx)
    NumberMatches = 0
    for _ in range(NumberOfLines):
        # I will actually read the contents from my file live, 
        # but here just assume i generate random numbers
        value = random.randint(-100, 100)
        if ComplexAlgorithm(value): NumberMatches += 1
    print(f"\t{idx}) | LUT={len(LUT)} | NumberMatches={NumberMatches} | done")

if __name__ == "__main__":

    ## Init
    num_processes = 9
    # this is just a pseudo-call to show you the structure of my LUT, the real one is larger
    LUT = (dict(i,set([i])) for i in range(1000))

    ## Store the multiple filenames
    ListOfLists = []
    for idx in range(num_processes):
        NumberOfLines = 10000
        ListOfLists.append( NumberOfLines )

    ## Init the processes
    ProcessList = []
    for processIndex in range(num_processes):
        ProcessList.append( 
            multiprocessing.Process(
                target=PerformMatching, 
                args=(processIndex, ListOfLists[processIndex])
            )
        )
        ProcessList[processIndex].start()

    ## Wait until the process terminates.
    for processIndex in range(num_processes):
        ProcessList[processIndex].join()

    ## Done
4

1 回答 1

0

如果你想走使用 a 的路线multiprocessing.Manager,这就是你可以做到的。权衡是字典由对存在于不同地址空间中的实际字典的代理的引用表示,因此每个字典引用都会导致相当于远程过程调用。换句话说,与“常规”字典相比,访问速度要慢得多。

在下面的演示程序中,我只为我的托管字典定义了几个方法,但是您可以定义任何您需要的方法。我还使用了多处理池,而不是显式启动单个进程;你可以考虑这样做。

from multiprocessing.managers import BaseManager, BaseProxy
from multiprocessing import Pool
from functools import partial

def worker(LUT, key):
    return LUT[key]


class MyDict:
    def __init__(self):
        """ initialize the dictionary """
        # the very large dictionary reduced for demo purposes:
        self._dict = {i: i for i in range(100)}

    def get(self, obj, default=None):
        """ delegates to underlying dict """
        return self._dict.get(obj, default)

    def __getitem__(self, obj):
        """ delegates to underlying dict """
        return self._dict[obj]

class MyDictManager(BaseManager):
    pass

class MyDictProxy(BaseProxy):
    _exposed_ = ('get', '__getitem__')

    def get(self, *args, **kwargs):
        return self._callmethod('get', args, kwargs)

    def __getitem__(self, *args, **kwargs):
        return self._callmethod('__getitem__', args, kwargs)


def main():
    MyDictManager.register('MyDict', MyDict, MyDictProxy)
    with MyDictManager() as manager:
        my_dict = manager.MyDict()
        pool = Pool()
        # pass proxy instead of actual LUT:
        results = pool.map(partial(worker, my_dict), range(100))
        print(sum(results))

if __name__ == '__main__':
    main()

印刷:

4950

讨论

Python 带有一个dict内置的托管类,可通过multiprocessing.Manager().dict(). 但是根据我先前的评论,使用这样的字典初始化如此大量的条目将非常低效,即每次访问都会相对昂贵。在我看来,创建我们自己的托管类会更便宜,它有一个底层的“常规”字典,可以在构建托管类时直接初始化,而不是通过代理引用。虽然dictPython 附带的托管确实可以使用已经构建的字典进行实例化,从而避免了效率低下的问题,我担心内存效率会受到影响,因为您将拥有字典的两个实例,即“常规”字典和“托管”字典。

于 2021-04-30T11:25:59.907 回答