-1

我正在尝试并行执行 cython 文件。我的代码的骨架是:

def build_DF_single(lo, hi, map[long, set[string]] authors_id_map, map[long, set[string]] authors_org_map, 
    map[long, set[string]] fos_name_map, map[long, set[string]] publisher_map, 
    map[long, set[string]] year_map, map[long, set[long]] reference_map, 
    map[string, double] authors_id_prob, map[string, double] authors_org_prob, 
    map[string, double] fos_name_prob, map[string, double] publisher_prob, 
    map[string, double] year_prob, map[string, set[long]] authors_id_co, 
    map[string, set[long]] authors_org_co, map[string, set[long]] fos_name_co, 
    map[string, set[long]] publisher_co, map[string, set[long]] year_co, 
    map[long, vector[double]] doc2vec_map):

    for i in tqdm(range(lo, hi)):
        line = lines[i]
        # Data cleaning on <line>

def mmap(name):
    d = joblib.load("mmap/" + name + ".mmap", mmap_mode="r")
    gc.collect()
    return d

authors_id_prob = mmap("authors_id_prob")
authors_org_prob = mmap("authors_org_prob")
fos_name_prob = mmap("fos_name_prob")
publisher_prob = mmap("publisher_prob")
year_prob = mmap("year_prob")
authors_id_co = mmap("authors_id_co")
authors_org_co = mmap("authors_org_co")
fos_name_co = mmap("fos_name_co")
publisher_co = mmap("publisher_co")
year_co = mmap("year_co")
doc2vec_map = mmap("doc2vec_map")

with open("file", "r") as f:
    lines = f.readlines() # Pretty large as well
batch_size = int(math.ceil(len(lines) / n_cpu))
results = Parallel(n_jobs = n_cpu, prefer="threads", max_nbytes=None)(delayed(build_DF_single)(
    (i * batch_size), min((i + 1) * batch_size, len(lines)),
    authors_id_map, authors_org_map, fos_name_map, publisher_map, year_map, reference_map, authors_id_prob, authors_org_prob, fos_name_prob, publisher_prob, year_prob, authors_id_co, authors_org_co, fos_name_co, publisher_co, year_co, doc2vec_map
) for i in range(n_cpu))

其中 authors_id_map、authors_org_map、fos_name_map、publisher_map、year_map、reference_map、authors_id_prob、authors_org_prob、fos_name_prob、publisher_prob、year_prob、authors_id_co、authors_org_co、fos_name_co、publisher_co、year_co、doc2vec_map 都是非常大的 c++ 地图。由于我不想将它们分叉到不同的进程,因此我将它们制作为内存映射。但是,当我的代码到达 Parallel() 部分时,我最终会收到以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "buildDF.pyx", line 473, in buildDF.build_DF
    results = Parallel(n_jobs = n_cpu, require="sharedmem", prefer="threads", max_nbytes=None)(delayed(build_DF_single)(
  File "/home/zhangji/.local/lib/python2.7/site-packages/joblib/parallel.py", line 1004, in __call__
    if self.dispatch_one_batch(iterator):
  File "/home/zhangji/.local/lib/python2.7/site-packages/joblib/parallel.py", line 808, in dispatch_one_batch
    islice = list(itertools.islice(iterator, big_batch_size))
  File "buildDF.pyx", line 475, in genexpr
    authors_id_map, authors_org_map, fos_name_map, publisher_map, year_map, reference_map, authors_id_prob, authors_org_prob, fos_name_prob, publisher_prob, year_prob, authors_id_co, authors_org_co, fos_name_co, publisher_co, year_co, doc2vec_map
  File "stringsource", line 207, in map.to_py.__pyx_convert_map_to_py_std_3a__3a_string____double
MemoryError

谁能告诉我发生了什么?什么是“字符串源”?

谢谢!

4

1 回答 1

1

跟进我的评论:

的引用stringsource有点令人困惑。它指的是由 Cython 内部生成的用 Cython 编写的实用程序代码。在这种情况下,调用实用程序代码将您的 Python 类映射类型转换为 C++ std::map<std::string, double>( __pyx_convert_map_to_py_std_3a__3a_string____double)。

基本问题看起来是你试图做两件相互矛盾的事情:

  1. 您正在使用 Python 包装mmap来加载一组大文件,而无需一次将它们全部加载到内存中。

  2. 您正在将所有数据转换为 C++ std::map,大概是因为您希望它会更快,或者它可以在没有 GIL 的情况下运行。

转换std::map为不是“透明包装” - 它制作了一个全新的副本。因此,所有数据都从您的内存中加载mmap并直接加载到内存中 - 因此(我认为 - 当您没有提供最小的可重现示例MemoryError时,很难确定)。

没有明显的解决方案。要么您需要坚持使用 Python 对象以便您mmap可以保留,要么您需要自己实现一个可以加载数据mmap样式的 C++ 类以避免转换为std::map.

于 2020-07-11T17:05:42.157 回答