我正在尝试并行执行 cython 文件。我的代码的骨架是:
def build_DF_single(lo, hi, map[long, set[string]] authors_id_map, map[long, set[string]] authors_org_map,
map[long, set[string]] fos_name_map, map[long, set[string]] publisher_map,
map[long, set[string]] year_map, map[long, set[long]] reference_map,
map[string, double] authors_id_prob, map[string, double] authors_org_prob,
map[string, double] fos_name_prob, map[string, double] publisher_prob,
map[string, double] year_prob, map[string, set[long]] authors_id_co,
map[string, set[long]] authors_org_co, map[string, set[long]] fos_name_co,
map[string, set[long]] publisher_co, map[string, set[long]] year_co,
map[long, vector[double]] doc2vec_map):
for i in tqdm(range(lo, hi)):
line = lines[i]
# Data cleaning on <line>
def mmap(name):
d = joblib.load("mmap/" + name + ".mmap", mmap_mode="r")
gc.collect()
return d
authors_id_prob = mmap("authors_id_prob")
authors_org_prob = mmap("authors_org_prob")
fos_name_prob = mmap("fos_name_prob")
publisher_prob = mmap("publisher_prob")
year_prob = mmap("year_prob")
authors_id_co = mmap("authors_id_co")
authors_org_co = mmap("authors_org_co")
fos_name_co = mmap("fos_name_co")
publisher_co = mmap("publisher_co")
year_co = mmap("year_co")
doc2vec_map = mmap("doc2vec_map")
with open("file", "r") as f:
lines = f.readlines() # Pretty large as well
batch_size = int(math.ceil(len(lines) / n_cpu))
results = Parallel(n_jobs = n_cpu, prefer="threads", max_nbytes=None)(delayed(build_DF_single)(
(i * batch_size), min((i + 1) * batch_size, len(lines)),
authors_id_map, authors_org_map, fos_name_map, publisher_map, year_map, reference_map, authors_id_prob, authors_org_prob, fos_name_prob, publisher_prob, year_prob, authors_id_co, authors_org_co, fos_name_co, publisher_co, year_co, doc2vec_map
) for i in range(n_cpu))
其中 authors_id_map、authors_org_map、fos_name_map、publisher_map、year_map、reference_map、authors_id_prob、authors_org_prob、fos_name_prob、publisher_prob、year_prob、authors_id_co、authors_org_co、fos_name_co、publisher_co、year_co、doc2vec_map 都是非常大的 c++ 地图。由于我不想将它们分叉到不同的进程,因此我将它们制作为内存映射。但是,当我的代码到达 Parallel() 部分时,我最终会收到以下错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "buildDF.pyx", line 473, in buildDF.build_DF
results = Parallel(n_jobs = n_cpu, require="sharedmem", prefer="threads", max_nbytes=None)(delayed(build_DF_single)(
File "/home/zhangji/.local/lib/python2.7/site-packages/joblib/parallel.py", line 1004, in __call__
if self.dispatch_one_batch(iterator):
File "/home/zhangji/.local/lib/python2.7/site-packages/joblib/parallel.py", line 808, in dispatch_one_batch
islice = list(itertools.islice(iterator, big_batch_size))
File "buildDF.pyx", line 475, in genexpr
authors_id_map, authors_org_map, fos_name_map, publisher_map, year_map, reference_map, authors_id_prob, authors_org_prob, fos_name_prob, publisher_prob, year_prob, authors_id_co, authors_org_co, fos_name_co, publisher_co, year_co, doc2vec_map
File "stringsource", line 207, in map.to_py.__pyx_convert_map_to_py_std_3a__3a_string____double
MemoryError
谁能告诉我发生了什么?什么是“字符串源”?
谢谢!