28

我正在为使用共享内存缓冲区存储其内部状态的 C 库编写 Python 绑定。这些缓冲区的分配和释放是由库本身在 Python 之外完成的,但我可以通过从 Python 中调用包装的构造函数/析构函数来间接控制何时发生这种情况。我想向 Python 公开一些缓冲区,以便我可以从中读取,并在某些情况下将值推送给它们。性能和内存使用是重要的问题,所以我想尽可能避免复制数据。

我目前的方法是创建一个 numpy 数组,它可以直接查看 ctypes 指针:

import numpy as np
import ctypes as C

libc = C.CDLL('libc.so.6')

class MyWrapper(object):

    def __init__(self, n=10):
        # buffer allocated by external library
        addr = libc.malloc(C.sizeof(C.c_int) * n)
        self._cbuf = (C.c_int * n).from_address(addr)

    def __del__(self):
        # buffer freed by external library
        libc.free(C.addressof(self._cbuf))
        self._cbuf = None

    @property
    def buffer(self):
        return np.ctypeslib.as_array(self._cbuf)

除了避免复制之外,这还意味着我可以使用 numpy 的索引和赋值语法并将其直接传递给其他 numpy 函数:

wrap = MyWrapper()
buf = wrap.buffer       # buf is now a writeable view of a C-allocated buffer

buf[:] = np.arange(10)  # this is pretty cool!
buf[::2] += 10

print(wrap.buffer)
# [10  1 12  3 14  5 16  7 18  9]

但是,它本身也很危险:

del wrap                # free the pointer

print(buf)              # this is bad!
# [1852404336 1969367156  538978662  538976288  538976288  538976288
#  1752440867 1763734377 1633820787       8548]

# buf[0] = 99           # uncomment this line if you <3 segfaults

为了使这更安全,我需要能够在尝试读取/写入数组内容之前检查底层 C 指针是否已被释放。我对如何做到这一点有一些想法:

  • 一种方法是生成一个包含对属性的np.ndarray引用的子类,检查它是否在对其底层内存进行任何读/写之前,如果是这种情况则引发异常。_cbufMyWrapperNone
  • 我可以轻松地在同一个缓冲区上生成多个视图,例如通过.view强制转换或切片,因此每个视图都需要继承对的引用_cbuf和执行检查的方法。我怀疑这可以通过覆盖来实现__array_finalize__,但我不确定具体如何。
  • 在读取和/或写入数组内容的任何操作之前,还需要调用“指针检查”方法。我对 numpy 的内部结构知之甚少,无法提供详尽的覆盖方法列表。

我如何实现np.ndarray执行此检查的子类?任何人都可以提出更好的方法吗?


更新:这个类做了我想要的大部分:

class SafeBufferView(np.ndarray):

    def __new__(cls, get_buffer, shape=None, dtype=None):
        obj = np.ctypeslib.as_array(get_buffer(), shape).view(cls)
        if dtype is not None:
            obj.dtype = dtype
        obj._get_buffer = get_buffer
        return obj

    def __array_finalize__(self, obj):
        if obj is None: return
        self._get_buffer = getattr(obj, "_get_buffer", None)

    def __array_prepare__(self, out_arr, context=None):
        if not self._get_buffer(): raise Exception("Dangling pointer!")
        return out_arr

    # this seems very heavy-handed - surely there must be a better way?
    def __getattribute__(self, name):
        if name not in ["__new__", "__array_finalize__", "__array_prepare__",
                        "__getattribute__", "_get_buffer"]:
            if not self._get_buffer(): raise Exception("Dangling pointer!")
        return super(np.ndarray, self).__getattribute__(name)

例如:

wrap = MyWrapper()
sb = SafeBufferView(lambda: wrap._cbuf)
sb[:] = np.arange(10)

print(repr(sb))
# SafeBufferView([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)

print(repr(sb[::2]))
# SafeBufferView([0, 2, 4, 6, 8], dtype=int32)

sbv = sb.view(np.double)
print(repr(sbv))
# SafeBufferView([  2.12199579e-314,   6.36598737e-314,   1.06099790e-313,
#          1.48539705e-313,   1.90979621e-313])

# we have to call the destructor method of `wrap` explicitly - `del wrap` won't
# do anything because `sb` and `sbv` both hold references to `wrap`
wrap.__del__()

print(sb)                # Exception: Dangling pointer!
print(sb + 1)            # Exception: Dangling pointer!
print(sbv)               # Exception: Dangling pointer!
print(np.sum(sb))        # Exception: Dangling pointer!
print(sb.dot(sb))        # Exception: Dangling pointer!

print(np.dot(sb, sb))    # oops...
# -70104698

print(np.extract(np.ones(10), sb))
# array([251019024,     32522, 498870232,     32522,         4,         5,
#               6,         7,        48,         0], dtype=int32)

# np.copyto(sb, np.ones(10, np.int32))    # don't try this at home, kids!

我敢肯定还有其他我错过的边缘情况。


更新 2:正如@ivan_pozdeevweakref.proxy所建议的,我玩过. 这是一个好主意,但不幸的是我看不出它如何与 numpy 数组一起工作。我可以尝试为返回的 numpy 数组创建一个弱引用:.buffer

wrap = MyWrapper()
wr = weakref.proxy(wrap.buffer)
print(wr)
# ReferenceError: weakly-referenced object no longer exists
# <weakproxy at 0x7f6fe715efc8 to NoneType at 0x91a870>

我认为这里的问题是np.ndarray返回的实例wrap.buffer立即超出范围。一种解决方法是让类在初始化时实例化数组,持有对它的强引用,并让.buffer()getter 将 a 返回weakref.proxy到数组:

class MyWrapper2(object):

    def __init__(self, n=10):
        # buffer allocated by external library
        addr = libc.malloc(C.sizeof(C.c_int) * n)
        self._cbuf = (C.c_int * n).from_address(addr)
        self._buffer = np.ctypeslib.as_array(self._cbuf)

    def __del__(self):
        # buffer freed by external library
        libc.free(C.addressof(self._cbuf))
        self._cbuf = None
        self._buffer = None

    @property
    def buffer(self):
        return weakref.proxy(self._buffer)

但是,如果我在仍然分配缓冲区的同时在同一个数组上创建第二个视图,则会中断:

wrap2 = MyWrapper2()
buf = wrap2.buffer
buf[:] = np.arange(10)

buf2 = buf[:]   # create a second view onto the contents of buf

print(repr(buf))
# <weakproxy at 0x7fec3e709b50 to numpy.ndarray at 0x210ac80>
print(repr(buf2))
# array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)

wrap2.__del__()

print(buf2[:])  # this is bad
# [1291716568    32748 1291716568    32748        0        0        0
#         0       48        0] 

print(buf[:])   # WTF?!
# [34525664        0        0        0        0        0        0        0
#         0        0]  

这被严重破坏了 - 在调用之后,wrap2.__del__()我不仅可以读取和写入buf2一个 numpy 数组视图wrap2._cbuf,而且我什至可以读取和写入buf,这应该是不可能的,因为wrap2.__del__()设置wrap2._bufferNone

4

6 回答 6

8

当任何 numpy 数组存在时,您必须保留对 Wrapper 的引用。实现这一点的最简单方法是将此引用保存在 ctype-buffer 的属性中:

class MyWrapper(object):
    def __init__(self, n=10):
        # buffer allocated by external library
        self.size = n
        self.addr = libc.malloc(C.sizeof(C.c_int) * n)

    def __del__(self):
        # buffer freed by external library
        libc.free(self.addr)

    @property
    def buffer(self):
        buf = (C.c_int * self.size).from_address(self.addr)
        buf._wrapper = self
        return np.ctypeslib.as_array(buf)

这样,当最后一个引用(例如最后一个 numpy 数组)被垃圾收集时,您的包装器就会自动释放。

于 2016-06-28T21:06:19.210 回答
4

它是由第三方编写并以二进制形式分发的专有库。我可以从 C 而不是 Python 调用相同的库函数,但这无济于事,因为我仍然无法访问实际分配和释放缓冲区的代码。例如,我不能自己分配缓冲区,然后将它们作为指针传递给库。

但是,您可以将缓冲区包装在 Python 扩展类型中。这样,您可以只公开您希望可用的接口,并让扩展类型自动处理缓冲区的释放。这样,Python API 就不可能进行空闲内存读/写。


我的缓冲区

#include <python3.3/Python.h>

// Hardcoded values
// N.B. Most of these are only needed for defining the view in the Python
// buffer protocol
static long external_buffer_size = 32;          // Size of buffer in bytes
static long external_buffer_shape[] = { 32 };   // Number of items for each dimension
static long external_buffer_strides[] = { 1 };  // Size of item for each dimension

//----------------------------------------------------------------------------
// Code to simulate the third-party library
//----------------------------------------------------------------------------

// Allocate a new buffer
static void* external_buffer_allocate()
{
    // Allocate the memory
    void* ptr = malloc(external_buffer_size);

    // Debug
    printf("external_buffer_allocate() = 0x%lx\n", (long) ptr);

    // Fill buffer with a recognizable pattern
    int i;
    for (i = 0; i < external_buffer_size; ++i)
    {
        *((char*) ptr + i) = i;
    }

    // Done
    return ptr;
}

// Free an existing buffer
static void external_buffer_free(void* ptr)
{
    // Debug
    printf("external_buffer_free(0x%lx)\n", (long) ptr);

    // Release the memory
    free(ptr);
}


//----------------------------------------------------------------------------
// Define a new Python instance object for the external buffer
// See: https://docs.python.org/3/extending/newtypes.html
//----------------------------------------------------------------------------

typedef struct
{
    // Python macro to include standard members, like reference count
    PyObject_HEAD

    // Base address of allocated memory
    void* ptr;
} BufferObject;


//----------------------------------------------------------------------------
// Define the instance methods for the new object
//----------------------------------------------------------------------------

// Called when there are no more references to the object
static void BufferObject_dealloc(BufferObject* self)
{
    external_buffer_free(self->ptr);
}

// Called when we want a new view of the buffer, using the buffer protocol
// See: https://docs.python.org/3/c-api/buffer.html
static int BufferObject_getbuffer(BufferObject *self, Py_buffer *view, int flags)
{
    // Set the view info
    view->obj = (PyObject*) self;
    view->buf = self->ptr;                      // Base pointer
    view->len = external_buffer_size;           // Length
    view->readonly = 0;
    view->itemsize = 1;
    view->format = "B";                         // unsigned byte
    view->ndim = 1;
    view->shape = external_buffer_shape;
    view->strides = external_buffer_strides;
    view->suboffsets = NULL;
    view->internal = NULL;

    // We need to increase the reference count of our buffer object here, but
    // Python will automatically decrease it when the view goes out of scope
    Py_INCREF(self);

    // Done
    return 0;
}

//----------------------------------------------------------------------------
// Define the struct required to implement the buffer protocol
//----------------------------------------------------------------------------

static PyBufferProcs BufferObject_as_buffer =
{
    // Create new view
    (getbufferproc) BufferObject_getbuffer,

    // Release an existing view
    (releasebufferproc) 0,
};


//----------------------------------------------------------------------------
// Define a new Python type object for the external buffer
//----------------------------------------------------------------------------

static PyTypeObject BufferType =
{
    PyVarObject_HEAD_INIT(NULL, 0)
    "external buffer",                  /* tp_name */
    sizeof(BufferObject),               /* tp_basicsize */
    0,                                  /* tp_itemsize */
    (destructor) BufferObject_dealloc,  /* tp_dealloc */
    0,                                  /* tp_print */
    0,                                  /* tp_getattr */
    0,                                  /* tp_setattr */
    0,                                  /* tp_reserved */
    0,                                  /* tp_repr */
    0,                                  /* tp_as_number */
    0,                                  /* tp_as_sequence */
    0,                                  /* tp_as_mapping */
    0,                                  /* tp_hash  */
    0,                                  /* tp_call */
    0,                                  /* tp_str */
    0,                                  /* tp_getattro */
    0,                                  /* tp_setattro */
    &BufferObject_as_buffer,            /* tp_as_buffer */
    Py_TPFLAGS_DEFAULT,                 /* tp_flags */
    "External buffer",                  /* tp_doc */
    0,                                  /* tp_traverse */
    0,                                  /* tp_clear */
    0,                                  /* tp_richcompare */
    0,                                  /* tp_weaklistoffset */
    0,                                  /* tp_iter */
    0,                                  /* tp_iternext */
    0,                                  /* tp_methods */
    0,                                  /* tp_members */
    0,                                  /* tp_getset */
    0,                                  /* tp_base */
    0,                                  /* tp_dict */
    0,                                  /* tp_descr_get */
    0,                                  /* tp_descr_set */
    0,                                  /* tp_dictoffset */
    (initproc) 0,                       /* tp_init */
    0,                                  /* tp_alloc */
    0,                                  /* tp_new */
};


//----------------------------------------------------------------------------
// Define a Python function to put in the module which creates a new buffer
//----------------------------------------------------------------------------

static PyObject* mybuffer_create(PyObject *self, PyObject *args)
{
    BufferObject* buf = (BufferObject*)(&BufferType)->tp_alloc(&BufferType, 0);
    buf->ptr = external_buffer_allocate();
    return (PyObject*) buf;
}


//----------------------------------------------------------------------------
// Define the set of all methods which will be exposed in the module
//----------------------------------------------------------------------------

static PyMethodDef mybufferMethods[] =
{
    {"create", mybuffer_create, METH_VARARGS, "Create a buffer"},
    {NULL, NULL, 0, NULL}        /* Sentinel */
};


//----------------------------------------------------------------------------
// Define the module
//----------------------------------------------------------------------------

static PyModuleDef mybuffermodule = {
    PyModuleDef_HEAD_INIT,
    "mybuffer",
    "Example module that creates an extension type.",
    -1,
    mybufferMethods
    //NULL, NULL, NULL, NULL, NULL
};


//----------------------------------------------------------------------------
// Define the module's entry point
//----------------------------------------------------------------------------

PyMODINIT_FUNC PyInit_mybuffer(void)
{
    PyObject* m;

    if (PyType_Ready(&BufferType) < 0)
        return NULL;

    m = PyModule_Create(&mybuffermodule);
    if (m == NULL)
        return NULL;

    return m;
}

测试.py

#!/usr/bin/env python3

import numpy as np
import mybuffer

def test():

    print('Create buffer')
    b = mybuffer.create()

    print('Print buffer')
    print(b)

    print('Create memoryview')
    m = memoryview(b)

    print('Print memoryview shape')
    print(m.shape)

    print('Print memoryview format')
    print(m.format)

    print('Create numpy array')
    a = np.asarray(b)

    print('Print numpy array')
    print(repr(a))

    print('Change every other byte in numpy')
    a[::2] += 10

    print('Print numpy array')
    print(repr(a))

    print('Change first byte in memory view')
    m[0] = 42

    print('Print numpy array')
    print(repr(a))

    print('Delete buffer')
    del b

    print('Delete memoryview')
    del m

    print('Delete numpy array - this is the last ref, so should free memory')
    del a

    print('Memory should be free before this line')

if __name__ == '__main__':
    test()

例子

$ gcc -fPIC -shared -o mybuffer.so mybuffer.c -lpython3.3m
$ ./test.py
Create buffer
external_buffer_allocate() = 0x290fae0
Print buffer
<external buffer object at 0x7f7231a2cc60>
Create memoryview
Print memoryview shape
(32,)
Print memoryview format
B
Create numpy array
Print numpy array
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31], dtype=uint8)
Change every other byte in numpy
Print numpy array
array([10,  1, 12,  3, 14,  5, 16,  7, 18,  9, 20, 11, 22, 13, 24, 15, 26,
       17, 28, 19, 30, 21, 32, 23, 34, 25, 36, 27, 38, 29, 40, 31], dtype=uint8)
Change first byte in memory view
Print numpy array
array([42,  1, 12,  3, 14,  5, 16,  7, 18,  9, 20, 11, 22, 13, 24, 15, 26,
       17, 28, 19, 30, 21, 32, 23, 34, 25, 36, 27, 38, 29, 40, 31], dtype=uint8)
Delete buffer
Delete memoryview
Delete numpy array - this is the last ref, so should free memory
external_buffer_free(0x290fae0)
Memory should be free before this line
于 2016-07-02T23:48:22.973 回答
2

我喜欢@Vikas 的方法,但是当我尝试它时,我只得到了一个单个FreeOnDel对象的 Numpy 对象数组。以下更简单且有效:

class FreeOnDel(object):
    def __init__(self, data, shape, dtype, readonly=False):
        self.__array_interface__ = {"version": 3,
                                    "typestr": numpy.dtype(dtype).str,
                                    "data": (data, readonly),
                                    "shape": shape}
    def __del__(self):
        data = self.__array_interface__["data"][0]      # integer ptr
        print("do what you want with the data at {}".format(data))

view = numpy.array(FreeOnDel(ptr, shape, dtype), copy=False)

其中ptr是指向数据的整数指针(例如ctypesptr.addressof(...))。

这个__array_interface__属性足以告诉 Numpy 如何将内存区域转换为数组,然后FreeOnDel对象变成该数组的base. 删除数组时,删除会传播到FreeOnDel对象,您可以在其中调用libc.free.

我什至可以称这个FreeOnDel类为“ BufferOwner”,因为这是它的作用:跟踪所有权。

于 2018-05-02T08:59:15.123 回答
1

__del__在将其传递给方法之前,您只需要一个带有附加功能的包装器numpy.ctypeslib.as_array

class FreeOnDel(object):
    def __init__(self, ctypes_ptr):
        # This is not needed if you are dealing with ctypes.POINTER() objects
        # Start of hack for ctypes ARRAY type;
        if not hasattr(ctypes_ptr, 'contents'):
            # For static ctypes arrays, the length and type are stored
            # in the type() rather than object. numpy queries these 
            # properties to find out the shape and type, hence needs to be 
            # copied. I wish type() properties could be automated by 
            # __getattr__ too
            type(self)._length_ = type(ctypes_ptr)._length_
            type(self)._type_ = type(ctypes_ptr)._type_
        # End of hack for ctypes ARRAY type;

        # cannot call self._ctypes_ptr = ctypes_ptr because of recursion
        super(FreeOnDel, self).__setattr__('_ctypes_ptr', ctypes_ptr)

    # numpy.ctypeslib.as_array function sets the __array_interface__
    # on type(ctypes_ptr) which is not called by __getattr__ wrapper
    # Hence this additional wrapper.
    @property
    def __array_interface__(self):
        return self._ctypes_ptr.__array_interface__

    @__array_interface__.setter
    def __array_interface__(self, value):
        self._ctypes_ptr.__array_interface__ = value

    # This is the onlly additional function we need rest all is overhead
    def __del__(self):
        addr = ctypes.addressof(self._ctypes_ptr)
        print("freeing address %x" % addr)
        libc.free(addr)
        # Need to be called on all object members
        # object.__del__(self) does not work
        del self._ctypes_ptr

    def __getattr__(self, attr):
        return getattr(self._ctypes_ptr, attr)

    def __setattr__(self, attr, val):
        setattr(self._ctypes_ptr, attr, val)

去测试

In [32]: import ctypes as C

In [33]: n = 10

In [34]: libc = C.CDLL("libc.so.6")

In [35]: addr = libc.malloc(C.sizeof(C.c_int) * n)

In [36]: cbuf = (C.c_int * n).from_address(addr)

In [37]: wrap = FreeOnDel(cbuf)

In [38]: sb = np.ctypeslib.as_array(wrap, (10,))

In [39]: sb[:] = np.arange(10)

In [40]: print(repr(sb))
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)

In [41]: print(repr(sb[::2]))
array([0, 2, 4, 6, 8], dtype=int32)

In [42]: sbv = sb.view(np.double)

In [43]: print(repr(sbv))
array([  2.12199579e-314,   6.36598737e-314,   1.06099790e-313,
         1.48539705e-313,   1.90979621e-313])

In [45]: buf2 = sb[:8]

In [46]: sb[::2] += 10

In [47]: del cbuf   # Memory not freed because this does not have __del__

In [48]: del wrap   # Memory not freed because sb, sbv, buf2 have references

In [49]: del sb     # Memory not freed because sbv, buf have references

In [50]: del buf2   # Memory not freed because sbv has reference

In [51]: del sbv    # Memory freed because no more references
freeing address 2bc6bc0

实际上更简单的解决方案是覆盖__del__函数

In [7]: olddel = getattr(cbuf, '__del__', lambda: 0)

In [8]: cbuf.__del__ = lambda self : libc.free(C.addressof(self)), olddel

In [10]: import numpy as np

In [12]: sb = np.ctypeslib.as_array(cbuf, (10,))

In [13]: sb[:] = np.arange(10)

In [14]: print(repr(sb))
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)

In [15]: print(repr(sb))
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)

In [16]: print(repr(sb[::2]))
array([0, 2, 4, 6, 8], dtype=int32)

In [17]: sbv = sb.view(np.double)

In [18]: print(repr(sbv))
array([  2.12199579e-314,   6.36598737e-314,   1.06099790e-313,
         1.48539705e-313,   1.90979621e-313])

In [19]: buf2 = sb[:8]

In [20]: sb[::2] += 10

In [22]: del cbuf   # Memory not freed

In [23]: del sb     # Memory not freed because sbv, buf have references

In [24]: del buf2   # Memory not freed because sbv has reference

In [25]: del sbv    # Memory freed because no more references
于 2017-07-19T19:52:46.443 回答
1

weakref是您提议的功能的内置机制。具体来说,weakref.proxy是一个与被引用对象具有相同接口的对象。在引用对象的处置后,代理上的任何操作都会引发weakref.ReferenceError. 你甚至不需要numpy

In [2]: buffer=(c.c_int*100)()   #acts as an example for an externally allocated buffer
In [3]: voidp=c.addressof(buffer)

In [10]: a=(c.c_int*100).from_address(voidp) # python object accessing the buffer.
                 # Here it's created from raw address value. It's better to use function
                 # prototypes instead for some type safety.
In [14]: ra=weakref.proxy(a)

In [15]: a[1]=1
In [16]: ra[1]
Out[16]: 1

In [17]: del a
In [18]: ra[1]
ReferenceError: weakly-referenced object no longer exists

In [20]: buffer[1]
Out[20]: 1

如您所见,在任何情况下,您都需要 C 缓冲区上的普通 Python 对象。如果外部库拥有内存,则必须在 C 级别释放缓冲区之前删除该对象。如果您自己拥有内存,则只需ctypes按正常方式创建一个对象,然后在删除时将其释放。

因此,如果您的外部库拥有内存并且可以随时释放(您的规范对此含糊不清),它必须以某种方式告诉您它即将这样做 - 否则,您无法知道这一点以采取必要的行动。

于 2016-06-28T15:25:00.523 回答
0

如果您可以从 Python 完全控制 C 缓冲区的生命周期,那么您基本上拥有的是一个ndarray应该使用的 Python“缓冲区”对象。

因此,

  • 有两种基本方法可以连接它们:
    • 缓冲区-> ndarray
    • ndarray -> 缓冲区
  • 还有一个问题如何实现缓冲区本身

缓冲区-> ndarray

不安全:bufferndarray. 引入第 3 个对象来保存对两者的引用并没有更好:那么您只需要跟踪第 3 个对象而不是buffer.

ndarray -> 缓冲区

“现在你说!” 因为手头的任务是“ndarray应该使用的缓冲区”?这是自然的方式。

事实上,numpy它有一个内置机制:任何ndarray不拥有其内存的对象都持有对其base属性中的对象的引用(从而防止后者被垃圾收集)。对于视图,属性会相应地自动分配(如果base是,则分配给父对象None或父对象base)。

问题是你不能把任何旧物体放在那里。相反,该属性由构造函数填充,并且建议的对象首先经过其审查。

所以,如果我们可以构造一些numpy.array接受并认为有资格进行内存重用的自定义对象(numpy.ctypeslib.as_array实际上是一个numpy.array(copy=False)带有一些健全性检查的包装器)......

<...>

于 2016-07-06T08:21:38.933 回答