3

在我的项目中,我使用pybind11将 C++ 代码绑定到 Python。最近我不得不处理非常大的数据集(70GB+),并且遇到了需要从std::deque多个std::deque's 之间拆分数据。由于我的数据集非常大,我希望拆分不会有太多的内存开销。因此,我选择了一波一推的策略,这通常可以确保满足我的要求。

这都是理论上的。在实践中,我的进程被杀死了。所以我在过去的两天里一直在努力,最终想出了以下最小的例子来证明这个问题。

通常,最小示例会在 (~11GB) 中创建一堆数据deque,将其返回给 Python,然后再次调用 toC++以移动元素。就那么简单。移动部分在执行器中完成。

有趣的是,如果我不使用 executor,内存使用量与预期相同,并且当 ulimit 对虚拟内存施加限制时,程序确实尊重这些限制并且不会崩溃。

测试.py

from test import _test
import asyncio
import concurrent

async def test_main(loop, executor):
    numbers = _test.generate()
    # moved_numbers = _test.move(numbers) # This works!
    moved_numbers = await loop.run_in_executor(executor, _test.move, numbers) # This doesn't!

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    executor = concurrent.futures.ThreadPoolExecutor(1)

    task = loop.create_task(test_main(loop, executor))
    loop.run_until_complete(task)

    executor.shutdown()
    loop.close()

测试.cpp

#include <deque>
#include <iostream>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>

namespace py = pybind11;

PYBIND11_MAKE_OPAQUE(std::deque<uint64_t>);
PYBIND11_DECLARE_HOLDER_TYPE(T, std::shared_ptr<T>);

template<class T>
void py_bind_opaque_deque(py::module& m, const char* type_name) {
    py::class_<std::deque<T>, std::shared_ptr<std::deque<T>>>(m, type_name)
    .def(py::init<>())
    .def(py::init<size_t, T>());
}

PYBIND11_PLUGIN(_test) {
    namespace py = pybind11;
    pybind11::module m("_test");
    py_bind_opaque_deque<uint64_t>(m, "NumbersDequeue");

    // Generate ~11Gb of data.
    m.def("generate", []() {
        std::deque<uint64_t> numbers;
        for (uint64_t i = 0; i < 1500 * 1000000; ++i) {
            numbers.push_back(i);
        }
        return numbers;
    });

    // Move data from one dequeue to another.
    m.def("move", [](std::deque<uint64_t>& numbers) {
        std::deque<uint64_t> numbers_moved;

        while (!numbers.empty()) {
            numbers_moved.push_back(std::move(numbers.back()));
            numbers.pop_back();
        }
        std::cout << "Done!\n";
        return numbers_moved;
    });

    return m.ptr();
}

测试/__init__.py

import warnings
warnings.simplefilter("default")

编译

g++ -std=c++14 -O2 -march=native -fPIC -Iextern/pybind11 `python3.5-config --includes` `python3.5-config --ldflags` `python3.5-config --libs` -shared -o test/_test.so test.cpp

观察:

  • 当移动部分不是由 executor 完成时,我们只需调用moved_numbers = _test.move(numbers),一切都按预期工作,htop 显示的内存使用情况保持不变11Gb,太棒了!
  • 在执行器中完成移动部分时,程序会占用双倍的内存并崩溃。
  • 当引入虚拟内存限制(~15Gb)时,一切正常,这可能是最有趣的部分。

    ulimit -Sv 15000000 && python3.5 test.py>> Done!.

  • 当我们增加限制时,程序崩溃(150Gb > 我的 RAM)。

    ulimit -Sv 150000000 && python3.5 test.py>>[1] 2573 killed python3.5 test.py

  • 使用 deque 方法shrink_to_fit没有帮助(也不应该)

用过的软件

Ubuntu 14.04
gcc version 5.4.1 20160904 (Ubuntu 5.4.1-2ubuntu1~14.04)
Python 3.5.2
pybind11 latest release - v1.8.1

笔记

请注意,此示例仅用于演示问题。发生问题需要 asyncio和的使用。pybind

任何关于可能发生的事情的想法都非常受欢迎。

4

1 回答 1

1

这个问题原来是由在一个线程中创建数据然后在另一个线程中释放引起的。之所以如此,是因为 glibc 中的 malloc arenas (供参考,请参阅此)。可以通过以下方式很好地证明:

executor1 = concurrent.futures.ThreadPoolExecutor(1)
executor2 = concurrent.futures.ThreadPoolExecutor(1)

numbers = await loop.run_in_executor(executor1, _test.generate)
moved_numbers = await loop.run_in_executor(executor2, _test.move, numbers)

这将占用_test.generate和分配的内存的两倍

executor = concurrent.futures.ThreadPoolExecutor(1)

numbers = await loop.run_in_executor(executor, _test.generate)
moved_numbers = await loop.run_in_executor(executor, _test.move, numbers)

哪个伤口没有。

这个问题可以通过重写代码来解决,这样它就不会将元素从一个容器移动到另一个容器(我的情况),或者通过设置环境变量export MALLOC_ARENA_MAX=1将 malloc arenas 的数量限制为 1。然而,这可能会涉及一些性能影响(拥有多个竞技场是有充分理由的)。

于 2016-10-16T09:41:51.113 回答