我想分享我自己解决这个问题的经验。我能感受到你的痛苦和困惑,但实际上,如果你知道自己在做什么,考虑到你有大量的选择,实施一个可行的解决方案并不是太难。
客观的
实现一个能够执行两个操作的缓冲区池 -获取和释放。
基本池化策略:
- 获取从池中提取缓冲区有效地将可用缓冲区数量减少 1;
- 如果没有可用的缓冲区,则会出现两个选项:
- 增长池并返回一个新创建的缓冲区;或者
- 创建并返回一个虚拟缓冲区(如下所述)。
- release将缓冲区返回到池中。
池的大小可以是固定的或可变的。“可变”意味着最初有 M 个预分配缓冲区(例如零),并且池可以按需增长到 N。“固定”意味着所有缓冲区都在池创建时预分配 (M = N) .
实现一个回调来获取 libuv 的缓冲区。
除了内存不足的情况外,在任何情况下都不允许无限池增长仍然使池正常工作。
执行
现在,让我们更详细地了解这一切。
池结构:
#define BUFPOOL_CAPACITY 100
typedef struct bufpool_s bufpool_t;
struct bufpool_s {
void *bufs[BUFPOOL_CAPACITY];
int size;
};
size
是当前池大小。
缓冲区本身是一个内存块,前缀为以下结构:
#define bufbase(ptr) ((bufbase_t *)((char *)(ptr) - sizeof(bufbase_t)))
#define buflen(ptr) (bufbase(ptr)->len)
typedef struct bufbase_s bufbase_t;
struct bufbase_s {
bufpool_t *pool;
int len;
};
len
是以字节为单位的缓冲区长度。
新缓冲区的分配如下所示:
void *bufpool_alloc(bufpool_t *pool, int len) {
bufbase_t *base = malloc(sizeof(bufbase_t) + len);
if (!base) return 0;
base->pool = pool;
base->len = len;
return (char *)base + sizeof(bufbase_t);
}
请注意,返回的指针指向标头之后的下一个字节——数据区。这允许拥有缓冲区指针,就好像它们是通过对malloc
.
释放是相反的:
void bufpool_free(void *ptr) {
if (!ptr) return;
free(bufbase(ptr));
}
libuv 的分配回调如下所示:
void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
int len;
void *ptr = bufpool_acquire(handle->loop->data, &len);
*buf = uv_buf_init(ptr, len);
}
您可以在此处看到,alloc_cb
它从循环上的用户数据指针中获取缓冲池的指针。这意味着缓冲池应在使用之前附加到事件循环。换句话说,您应该在创建循环时初始化一个池并将其指针分配给该data
字段。如果您已经在该字段中保存了其他用户数据,只需扩展您的结构。
虚拟缓冲区是一个假缓冲区,这意味着它并非源自池,但仍具有完整功能。虚拟缓冲区的目的是让整个事情在池饥饿的罕见情况下工作,即当所有缓冲区都被获取并且需要另一个缓冲区时。根据我的研究,在所有现代操作系统上分配大约 8Kb 的小内存块都非常快 - 这非常适合虚拟缓冲区的大小。
#define DUMMY_BUF_SIZE 8000
void *bufpool_dummy() {
return bufpool_alloc(0, DUMMY_BUF_SIZE);
}
获取操作:
void *bufpool_acquire(bufpool_t *pool, int *len) {
void *buf = bufpool_dequeue(pool);
if (!buf) buf = bufpool_dummy();
*len = buf ? buflen(buf) : 0;
return buf;
}
释放操作:
void bufpool_release(void *ptr) {
bufbase_t *base;
if (!ptr) return;
base = bufbase(ptr);
if (base->pool) bufpool_enqueue(base->pool, ptr);
else free(base);
}
这里有两个函数 -bufpool_enqueue
和bufpool_dequeue
. 基本上,他们执行池的所有工作。
在我的例子中,在上面所说的上面有一个 O(1) 的缓冲区索引队列,这使我能够更有效地跟踪池的状态,非常快速地获取缓冲区的索引。没有必要像我做的那样极端,因为池的最大大小是有限的,因此任何数组搜索也将在时间上保持不变。
在最简单的情况下,您可以将这些函数实现为结构中整个bufs
数组的纯线性搜索器bufpool_s
。例如,如果获取了缓冲区,则搜索第一个非 NULL 点,保存指针并将 NULL 放入该点。下次释放缓冲区时,您搜索第一个 NULL 点并将其指针保存在那里。
池内部如下:
#define BUF_SIZE 64000
void *bufpool_grow(bufpool_t *pool) {
int idx = pool->size;
void *buf;
if (idx == BUFPOOL_CAPACITY) return 0;
buf = bufpool_alloc(pool, BUF_SIZE);
if (!buf) return 0;
pool->bufs[idx] = 0;
pool->size = idx + 1;
return buf;
}
void bufpool_enqueue(bufpool_t *pool, void *ptr) {
int idx;
for (idx = 0; idx < pool->size; ++idx) {
if (!pool->bufs[idx]) break;
}
assert(idx < pool->size);
pool->bufs[idx] = ptr;
}
void *bufpool_dequeue(bufpool_t *pool) {
int idx;
void *ptr;
for (idx = 0; idx < pool->size; ++idx) {
ptr = pool->bufs[idx];
if (ptr) {
pool->bufs[idx] = 0;
return ptr;
}
}
return bufpool_grow(pool);
}
正常的缓冲区大小是 64000 字节,因为我希望它能够舒适地放入带有标头的 64Kb 块中。
最后,初始化和反初始化例程:
void bufpool_init(bufpool_t *pool) {
pool->size = 0;
}
void bufpool_done(bufpool_t *pool) {
int idx;
for (idx = 0; idx < pool->size; ++idx) bufpool_free(pool->bufs[idx]);
}
请注意,为了说明的目的,这个实现被简化了。这里没有减少池的政策,而在现实世界的场景中,很可能需要它。
用法
你现在应该可以编写你的 libuv 回调了:
void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
/* ... */
bufpool_release(buf->base); /* Release the buffer */
}
循环初始化:
uv_loop_t *loop = malloc(sizeof(*loop));
bufpool_t *pool = malloc(sizeof(*pool));
uv_loop_init(loop);
bufpool_init(pool);
loop->data = pool;
手术:
uv_tcp_t *tcp = malloc(sizeof(*tcp));
uv_tcp_init(tcp);
/* ... */
uv_read_start((uv_handle_t *)tcp, alloc_cb, read_cb);
更新(2016 年 8 月 2 日)
在根据请求的大小获取缓冲区时使用自适应策略也是一个好主意,并且仅在请求大量数据时(例如所有读取和长时间写入)才返回池化缓冲区。对于其他情况(例如大多数写入),返回虚拟缓冲区。这将有助于避免浪费池化缓冲区,同时保持可接受的分配速度。例如:
void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
int len = size; /* Requested buffer size */
void *ptr = bufpool_acquire(handle->loop->data, &len);
*buf = uv_buf_init(ptr, len);
}
void *bufpool_acquire(bufpool_t *pool, int *len) {
int size = *len;
if (size > DUMMY_BUF_SIZE) {
buf = bufpool_dequeue(pool);
if (buf) {
if (size > BUF_SIZE) *len = BUF_SIZE;
return buf;
}
size = DUMMY_BUF_SIZE;
}
buf = bufpool_alloc(0, size);
*len = buf ? size : 0;
return buf;
}
PS不需要buflen
和bufpool_dummy
这个片段。