7

I'm developing a client which will receive the [EEG] data over tcp and write it to the ring buffer. I thought it can be very convenient to have the buffer as a ctypes or numpy array because it's possible to create a numpy 'view' to any location of such buffer and read/write/process the data without any copying operations. Or is it a bad idea in general?

However, I don't see how to implement a circular buffer of a fixed size this way. Suppose I have created a buffer object which is contiguous in memory. What is the best way to write the data when the end of the buffer is reached?

One possible way is to start overwriting the (already old) bytes from the begining when the write pointer reaches the end of the buffer array. Near the boundaries, however, the numpy view of some chunk (for processing) can't be created (or can it?) in this case, because some of it can still be located in the end of the buffer array while another already in its begining. I've read it's impossible to create such circular slices. How to solve this?

UPD: Thanks everybody for the answers. In case somebody also faces the same problem, here's the final code I've got.

4

4 回答 4

7

如果您需要一个 N 字节的窗口,请将缓冲区设为 2*N 字节并将所有输入写入两个位置:i % Ni % N + N,其中i是字节计数器。这样,缓冲区中总是有 N 个连续字节。

data = 'Data to buffer'
N = 4
buf = 2*N*['\00']

for i,c in enumerate(data):
    j = i % N
    buf[j] = c
    buf[j+N] = c
    if i >= N-1:
        print ''.join(buf[j+1:j+N+1]) 

印刷

Data
ata 
ta t
a to
 to 
to b
o bu
 buf
buff
uffe
ffer
于 2012-01-18T17:54:09.660 回答
2

我认为你需要在这里从 C 风格的思维中退后一步。为每次插入更新环形缓冲区永远不会有效。环形缓冲区与 numpy 数组所需的连续内存块接口根本不同;包括你提到你想做的fft。

一个自然的解决方案是为了性能而牺牲一点内存。例如,如果您需要在缓冲区中保存的元素数量为 N,则分配一个 N+1024(或一些合理的数字)的数组。然后,您只需要在每 1024 次插入中移动 N 个元素,并且您始终拥有 N 个元素的连续视图以直接可用。

编辑:这是实现上述内容的代码片段,并且应该提供良好的性能。但请注意,建议您以块的形式追加,而不是按元素追加。否则,无论您如何实现环形缓冲区,使用 numpy 的性能优势都会很快失效。

import numpy as np

class RingBuffer(object):
    def __init__(self, size, padding=None):
        self.size = size
        self.padding = size if padding is None else padding
        self.buffer = np.zeros(self.size+self.padding)
        self.counter = 0

    def append(self, data):
        """this is an O(n) operation"""
        data = data[-self.padding:]
        n = len(data)
        if self.remaining < n: self.compact()
        self.buffer[self.counter+self.size:][:n] = data
        self.counter += n

    @property
    def remaining(self):
        return self.padding-self.counter
    @property
    def view(self):
        """this is always an O(1) operation"""
        return self.buffer[self.counter:][:self.size]
    def compact(self):
        """
        note: only when this function is called, is an O(size) performance hit incurred,
        and this cost is amortized over the whole padding space
        """
        print 'compacting'
        self.buffer[:self.size] = self.view
        self.counter = 0

rb = RingBuffer(10)
for i in range(4):
    rb.append([1,2,3])
    print rb.view

rb.append(np.arange(15))
print rb.view  #test overflow
于 2014-07-20T20:40:07.943 回答
2

一种可能的方法是当写指针到达缓冲区数组的末尾时,从头开始覆盖(已经旧的)字节。

这是固定大小的环形缓冲区中的唯一选择。

我读过不可能创建这样的圆形切片。

这就是为什么我不会使用 Numpy 视图来执行此操作的原因。相反,您可以创建一个class包装器ndarray,保存缓冲区/数组、容量和指向插入点的指针(索引)。如果要将内容作为 Numpy 数组获取,则必须像这样复制:

buf = np.array([1,2,3,4])
indices = [3,0,1,2]
contents = buf[indices]    # copy

__setitem__如果您实现和 ,您仍然可以就地设置元素的值__setslice__

于 2012-01-18T11:22:09.510 回答
-1

@Janne Karila 的答案的变体,对于 C 但不是 numpy:
如果环形缓冲区非常宽,例如 N x 1G,那么不要将整个事物加倍,而是将 2*N 指向其行的指针数组加倍。例如对于 N=3,初始化

bufp = { buf[0], buf[1], buf[2], buf[0], buf[1], buf[2] };

然后你只写一次数据,并按时间顺序anyfunc( bufp[j:j+3] )查看行buf

于 2014-07-20T10:00:15.807 回答