我有一个查找表LUT
,它是一个非常大的字典(24G)。我有数百万个输入来对其执行查询。
我想将数百万个输入拆分为 32 个作业,并并行运行它们。由于空间限制,我无法运行多个 python 脚本,因为这会导致内存过载。
我想使用该multiprocessing
模块只加载LUT
一次,然后让不同的进程查找它,同时将它作为全局变量共享,而不必复制它。
但是,当我查看 时htop
,似乎每个子进程都在重新创建LUT
? 我提出这个要求是因为在VIRT
, RES
, SHR
. 数字非常高。但与此同时,我没有看到该Mem
行中使用的额外内存,它从 11Gb 增加到 12.3G 并且只是悬停在那里。
所以我很困惑,是它,还是不是LUT
在每个子流程中重新创建?我应该如何继续确保我正在运行并行工作,而不在每个子进程中复制 LUT?代码如下图所示。
(在这个实验中,我只使用了 1Gb 的 LUT,所以不用担心它不是 24Gb)
import os, sys, time, pprint, pdb, datetime
import threading, multiprocessing
## Print the process/thread details
def getDetails(idx):
pid = os.getpid()
threadName = threading.current_thread().name
processName = multiprocessing.current_process().name
print(f"{idx})\tpid={pid}\tprocessName={processName}\tthreadName={threadName} ")
return pid, threadName, processName
def ComplexAlgorithm(value):
# Instead of just lookup like this
# the real algorithm is some complex algorithm that performs some search
return value in LUT
## Querying the 24Gb LUT from my millions of lines of input
def PerformMatching(idx, NumberOfLines):
pid, threadName, processName = getDetails(idx)
NumberMatches = 0
for _ in range(NumberOfLines):
# I will actually read the contents from my file live,
# but here just assume i generate random numbers
value = random.randint(-100, 100)
if ComplexAlgorithm(value): NumberMatches += 1
print(f"\t{idx}) | LUT={len(LUT)} | NumberMatches={NumberMatches} | done")
if __name__ == "__main__":
## Init
num_processes = 9
# this is just a pseudo-call to show you the structure of my LUT, the real one is larger
LUT = (dict(i,set([i])) for i in range(1000))
## Store the multiple filenames
ListOfLists = []
for idx in range(num_processes):
NumberOfLines = 10000
ListOfLists.append( NumberOfLines )
## Init the processes
ProcessList = []
for processIndex in range(num_processes):
ProcessList.append(
multiprocessing.Process(
target=PerformMatching,
args=(processIndex, ListOfLists[processIndex])
)
)
ProcessList[processIndex].start()
## Wait until the process terminates.
for processIndex in range(num_processes):
ProcessList[processIndex].join()
## Done