我正在为使用共享内存缓冲区存储其内部状态的 C 库编写 Python 绑定。这些缓冲区的分配和释放是由库本身在 Python 之外完成的,但我可以通过从 Python 中调用包装的构造函数/析构函数来间接控制何时发生这种情况。我想向 Python 公开一些缓冲区,以便我可以从中读取,并在某些情况下将值推送给它们。性能和内存使用是重要的问题,所以我想尽可能避免复制数据。
我目前的方法是创建一个 numpy 数组,它可以直接查看 ctypes 指针:
import numpy as np
import ctypes as C
libc = C.CDLL('libc.so.6')
class MyWrapper(object):
def __init__(self, n=10):
# buffer allocated by external library
addr = libc.malloc(C.sizeof(C.c_int) * n)
self._cbuf = (C.c_int * n).from_address(addr)
def __del__(self):
# buffer freed by external library
libc.free(C.addressof(self._cbuf))
self._cbuf = None
@property
def buffer(self):
return np.ctypeslib.as_array(self._cbuf)
除了避免复制之外,这还意味着我可以使用 numpy 的索引和赋值语法并将其直接传递给其他 numpy 函数:
wrap = MyWrapper()
buf = wrap.buffer # buf is now a writeable view of a C-allocated buffer
buf[:] = np.arange(10) # this is pretty cool!
buf[::2] += 10
print(wrap.buffer)
# [10 1 12 3 14 5 16 7 18 9]
但是,它本身也很危险:
del wrap # free the pointer
print(buf) # this is bad!
# [1852404336 1969367156 538978662 538976288 538976288 538976288
# 1752440867 1763734377 1633820787 8548]
# buf[0] = 99 # uncomment this line if you <3 segfaults
为了使这更安全,我需要能够在尝试读取/写入数组内容之前检查底层 C 指针是否已被释放。我对如何做到这一点有一些想法:
- 一种方法是生成一个包含对属性的
np.ndarray
引用的子类,检查它是否在对其底层内存进行任何读/写之前,如果是这种情况则引发异常。_cbuf
MyWrapper
None
- 我可以轻松地在同一个缓冲区上生成多个视图,例如通过
.view
强制转换或切片,因此每个视图都需要继承对的引用_cbuf
和执行检查的方法。我怀疑这可以通过覆盖来实现__array_finalize__
,但我不确定具体如何。 - 在读取和/或写入数组内容的任何操作之前,还需要调用“指针检查”方法。我对 numpy 的内部结构知之甚少,无法提供详尽的覆盖方法列表。
我如何实现np.ndarray
执行此检查的子类?任何人都可以提出更好的方法吗?
更新:这个类做了我想要的大部分:
class SafeBufferView(np.ndarray):
def __new__(cls, get_buffer, shape=None, dtype=None):
obj = np.ctypeslib.as_array(get_buffer(), shape).view(cls)
if dtype is not None:
obj.dtype = dtype
obj._get_buffer = get_buffer
return obj
def __array_finalize__(self, obj):
if obj is None: return
self._get_buffer = getattr(obj, "_get_buffer", None)
def __array_prepare__(self, out_arr, context=None):
if not self._get_buffer(): raise Exception("Dangling pointer!")
return out_arr
# this seems very heavy-handed - surely there must be a better way?
def __getattribute__(self, name):
if name not in ["__new__", "__array_finalize__", "__array_prepare__",
"__getattribute__", "_get_buffer"]:
if not self._get_buffer(): raise Exception("Dangling pointer!")
return super(np.ndarray, self).__getattribute__(name)
例如:
wrap = MyWrapper()
sb = SafeBufferView(lambda: wrap._cbuf)
sb[:] = np.arange(10)
print(repr(sb))
# SafeBufferView([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)
print(repr(sb[::2]))
# SafeBufferView([0, 2, 4, 6, 8], dtype=int32)
sbv = sb.view(np.double)
print(repr(sbv))
# SafeBufferView([ 2.12199579e-314, 6.36598737e-314, 1.06099790e-313,
# 1.48539705e-313, 1.90979621e-313])
# we have to call the destructor method of `wrap` explicitly - `del wrap` won't
# do anything because `sb` and `sbv` both hold references to `wrap`
wrap.__del__()
print(sb) # Exception: Dangling pointer!
print(sb + 1) # Exception: Dangling pointer!
print(sbv) # Exception: Dangling pointer!
print(np.sum(sb)) # Exception: Dangling pointer!
print(sb.dot(sb)) # Exception: Dangling pointer!
print(np.dot(sb, sb)) # oops...
# -70104698
print(np.extract(np.ones(10), sb))
# array([251019024, 32522, 498870232, 32522, 4, 5,
# 6, 7, 48, 0], dtype=int32)
# np.copyto(sb, np.ones(10, np.int32)) # don't try this at home, kids!
我敢肯定还有其他我错过的边缘情况。
更新 2:正如@ivan_pozdeevweakref.proxy
所建议的,我玩过. 这是一个好主意,但不幸的是我看不出它如何与 numpy 数组一起工作。我可以尝试为返回的 numpy 数组创建一个弱引用:.buffer
wrap = MyWrapper()
wr = weakref.proxy(wrap.buffer)
print(wr)
# ReferenceError: weakly-referenced object no longer exists
# <weakproxy at 0x7f6fe715efc8 to NoneType at 0x91a870>
我认为这里的问题是np.ndarray
返回的实例wrap.buffer
立即超出范围。一种解决方法是让类在初始化时实例化数组,持有对它的强引用,并让.buffer()
getter 将 a 返回weakref.proxy
到数组:
class MyWrapper2(object):
def __init__(self, n=10):
# buffer allocated by external library
addr = libc.malloc(C.sizeof(C.c_int) * n)
self._cbuf = (C.c_int * n).from_address(addr)
self._buffer = np.ctypeslib.as_array(self._cbuf)
def __del__(self):
# buffer freed by external library
libc.free(C.addressof(self._cbuf))
self._cbuf = None
self._buffer = None
@property
def buffer(self):
return weakref.proxy(self._buffer)
但是,如果我在仍然分配缓冲区的同时在同一个数组上创建第二个视图,则会中断:
wrap2 = MyWrapper2()
buf = wrap2.buffer
buf[:] = np.arange(10)
buf2 = buf[:] # create a second view onto the contents of buf
print(repr(buf))
# <weakproxy at 0x7fec3e709b50 to numpy.ndarray at 0x210ac80>
print(repr(buf2))
# array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)
wrap2.__del__()
print(buf2[:]) # this is bad
# [1291716568 32748 1291716568 32748 0 0 0
# 0 48 0]
print(buf[:]) # WTF?!
# [34525664 0 0 0 0 0 0 0
# 0 0]
这被严重破坏了 - 在调用之后,wrap2.__del__()
我不仅可以读取和写入buf2
一个 numpy 数组视图wrap2._cbuf
,而且我什至可以读取和写入buf
,这应该是不可能的,因为wrap2.__del__()
设置wrap2._buffer
为None
。