18

如何实现一个 FIFO 缓冲区,我可以有效地将任意大小的字节块添加到头部,并从中有效地从尾部弹出任意大小的字节块?

背景:

我有一个类,它以任意大小的块从类似文件的对象中读取字节,并且它本身就是一个类似文件的对象,客户端可以从中读取任意大小的块中的字节。

我实现这个的方式是,每当客户端想要读取一个字节块时,该类将重复从底层的类似文件的对象(具有适合这些对象的块大小)中读取并将字节添加到 FIFO 的头部排队,直到队列中有足够的字节来为客户端提供请求大小的块。然后它从队列的尾部弹出这些字节并将它们返回给客户端。

当底层类文件对象的块大小远大于客户端从类中读取时使用的块大小时,会出现性能问题。

假设底层类文件对象的块大小为 1 MiB,客户端读取的块大小为 1 KiB。客户端第一次请求 1 KiB 时,该类必须读取 1 MiB 并将其添加到 FIFO 队列中。然后,对于该请求和随后的 1023 个请求,该类必​​须从 FIFO 队列的尾部弹出 1 KiB,该队列的大小逐渐从 1 MiB 减小到 0 字节,此时循环再次开始。

我目前已经使用 StringIO 对象实现了这一点。将新字节写入 StringIO 对象的末尾很快,但从开头删除字节非常慢,因为必须创建一个新的 StringIO 对象,该对象保存整个先前缓冲区的副本减去第一个字节块。

处理类似问题的 SO 问题往往指向双端队列容器。然而,双端队列被实现为双向链表。将块写入双端队列需要将块拆分为对象,每个对象包含一个字节。然后,双端队列将添加两个指向每个对象的指针以进行存储,与字节相比,内存需求可能至少增加了一个数量级。此外,遍历链表并处理每个对象以将块拆分为对象和将对象连接为块都需要很长时间。

4

4 回答 4

16

我目前已经使用 StringIO 对象实现了这一点。将新字节写入 StringIO 对象的末尾很快,但从开头删除字节非常慢,因为必须创建一个新的 StringIO 对象,该对象保存整个先前缓冲区的副本减去第一个字节块。

实际上,实现 FIFO 的最典型方法是使用两个指针环绕缓冲区,如下所示:

在此处输入图像描述 图片来源

现在,您可以StringIO()使用.seek()从适当的位置读/写来实现它。

于 2012-06-06T16:05:00.507 回答
12

更新:这是来自vartec 答案的循环缓冲区技术的实现(基于我的原始答案,为好奇的人保留在下面):

from cStringIO import StringIO

class FifoFileBuffer(object):
    def __init__(self):
        self.buf = StringIO()
        self.available = 0    # Bytes available for reading
        self.size = 0
        self.write_fp = 0

    def read(self, size = None):
        """Reads size bytes from buffer"""
        if size is None or size > self.available:
            size = self.available
        size = max(size, 0)

        result = self.buf.read(size)
        self.available -= size

        if len(result) < size:
            self.buf.seek(0)
            result += self.buf.read(size - len(result))

        return result


    def write(self, data):
        """Appends data to buffer"""
        if self.size < self.available + len(data):
            # Expand buffer
            new_buf = StringIO()
            new_buf.write(self.read())
            self.write_fp = self.available = new_buf.tell()
            read_fp = 0
            while self.size <= self.available + len(data):
                self.size = max(self.size, 1024) * 2
            new_buf.write('0' * (self.size - self.write_fp))
            self.buf = new_buf
        else:
            read_fp = self.buf.tell()

        self.buf.seek(self.write_fp)
        written = self.size - self.write_fp
        self.buf.write(data[:written])
        self.write_fp += len(data)
        self.available += len(data)
        if written < len(data):
            self.write_fp -= self.size
            self.buf.seek(0)
            self.buf.write(data[written:])
        self.buf.seek(read_fp)

原始答案(由上述答案取代):

您可以使用缓冲区并跟踪起始索引(读取文件指针),当它变得太大时偶尔压缩它(这应该会产生相当好的摊销性能)。

例如,像这样包装一个 StringIO 对象:

from cStringIO import StringIO
class FifoBuffer(object):
    def __init__(self):
        self.buf = StringIO()

    def read(self, *args, **kwargs):
        """Reads data from buffer"""
        self.buf.read(*args, **kwargs)

    def write(self, *args, **kwargs):
        """Appends data to buffer"""
        current_read_fp = self.buf.tell()
        if current_read_fp > 10 * 1024 * 1024:
            # Buffer is holding 10MB of used data, time to compact
            new_buf = StringIO()
            new_buf.write(self.buf.read())
            self.buf = new_buf
            current_read_fp = 0

        self.buf.seek(0, 2)    # Seek to end
        self.buf.write(*args, **kwargs)

        self.buf.seek(current_read_fp)
于 2012-06-06T15:55:23.470 回答
9

...但是从头开始删除字节非常慢,因为必须创建一个新的 StringIO 对象,该对象包含整个先前缓冲区的副本减去第一个字节块。

这种类型的缓慢可以通过bytearray在 Python>=v3.4 中使用来克服。请参阅此问题中的讨论,补丁在此处

关键是:从bytearrayby中删除头字节

a[:1] = b''   # O(1) (amortized)

a = a[1:]     # O(len(a))

什么时候len(a)很大(比如 10**6)。

与需要将对象连接成块的双端队列容器相比,bytearray它还为您提供了一种将整个数据集预览为数组(即其本身)​​的便捷方式。

现在可以按如下方式实现高效的 FIFO

class byteFIFO:
    """ byte FIFO buffer """
    def __init__(self):
        self._buf = bytearray()

    def put(self, data):
        self._buf.extend(data)

    def get(self, size):
        data = self._buf[:size]
        # The fast delete syntax
        self._buf[:size] = b''
        return data

    def peek(self, size):
        return self._buf[:size]

    def getvalue(self):
        # peek with no copy
        return self._buf

    def __len__(self):
        return len(self._buf)

基准

import time
bfifo = byteFIFO()
bfifo.put(b'a'*1000000)        # a very long array
t0 = time.time()
for k in range(1000000):
    d = bfifo.get(4)           # "pop" from head
    bfifo.put(d)               # "push" in tail
print('t = ', time.time()-t0)  # t = 0.897 on my machine

Cameron 回答中的循环/环形缓冲区实现需要 2.378 秒,而他/她的原始实现需要 1.108 秒。

于 2019-09-01T18:31:17.303 回答
4

你能假设一下预期的读/写量吗?

例如,将数据分块为 1024 字节片段并使用deque[1] 可能会更好;您可以只读取 N 个完整的块,然后将最后一个块拆分并将剩余部分放回队列的开头。

1)集合.deque

class collections.deque([iterable[, maxlen]])

返回一个新的双端队列对象,从左到右初始化(使用 append()),其中包含来自可迭代的数据。如果未指定 iterable,则新的双端队列为空。

双端队列是堆栈和队列的概括(名称发音为“deck”,是“双端队列”的缩写)。双端队列支持从双端队列的任一侧进行线程安全、内存高效的追加和弹出操作,在任一方向上的 O(1) 性能大致相同。...

于 2012-06-06T15:50:54.990 回答