0

需要帮助才能使以下工作。

我有多个生产者线程(每个都写 100 字节的数据)到 ringbuffer。和一个单一的读者(消费者)线程,一次读取 100 个字节并写入标准输出。(最后我想根据数据写入文件)

通过这个实现,我有时会从环形缓冲区读取数据错误。见下文由于环形缓冲区的大小很小,它会变满并且部分数据丢失。这不是我目前的问题。

** 问题:

  1. 在打印从 ringbuffer 读取的数据时,一些数据被交换!我找不到错误。
  2. 逻辑/方法是否正确?(或)有没有更好的方法来做到这一点

环形缓冲区.h

#define RING_BUFFER_SIZE  500
struct ringbuffer
{
    char *buffer;
    int wr_pointer;
    int rd_pointer;
    int size;
    int fill_count;
};

环形缓冲区.c

#include <stdio.h>
#include <stdlib.h> 
#include <string.h>
#include "ringbuffer.h"

int init_ringbuffer(char *rbuffer, struct ringbuffer *rb, size_t size)
{
    rb->buffer = rbuffer;
    rb->size = size;
        rb->rd_pointer = 0;
        rb->wr_pointer = 0; 
        rb->fill_count = 0;
    return 0;
}

int rb_get_free_space (struct ringbuffer *rb)
{ 
    return (rb->size -  rb->fill_count);
}

int rb_write (struct ringbuffer *rb, unsigned char * buf, int len)
{
    int availableSpace;
    int i;

    availableSpace = rb_get_free_space(rb);
    printf("In Write AVAIL SPC=%d\n",availableSpace);
    /* Check if Ring Buffer is FULL */
    if(len > availableSpace)
    {
       printf("NO SPACE TO WRITE - RETURN\n");
       return -1;
    }

    i = rb->wr_pointer;
    if(i == rb->size) //At the end of Buffer 
    {
       i = 0;
    }    
    else if (i + len > rb->size)
    {
        memcpy(rb->buffer + i, buf, rb->size - i);
        buf += rb->size - i;
        len = len - (rb->size - i);
        rb->fill_count += len;
        i = 0;
    }
    memcpy(rb->buffer + i, buf, len);
    rb->wr_pointer = i + len;
    rb->fill_count += len;

    printf("w...rb->write=%tx\n", rb->wr_pointer );
    printf("w...rb->read=%tx\n", rb->rd_pointer );
    printf("w...rb->fill_count=%d\n", rb->fill_count );
    return 0;
}

int rb_read (struct ringbuffer *rb, unsigned char * buf, int max)
{
    int i;

    printf("In Read,Current DATA size in RB=%d\n",rb->fill_count);
    /* Check if Ring Buffer is EMPTY */
    if(max > rb->fill_count) 
    {
      printf("In Read, RB EMPTY - RETURN\n");
      return  -1; 
    }  

    i = rb->rd_pointer;
    if (i == rb->size)
    {
       i = 0;
    }
    else if(i + max > rb->size)
    {
        memcpy(buf, rb->buffer + i, rb->size - i);
        buf += rb->size - i;
        max = max - (rb->size - i);
        rb->fill_count -= max;
        i = 0;
    }
    memcpy(buf, rb->buffer + i, max);
    rb->rd_pointer = i + max;
    rb->fill_count -= max;

    printf("r...rb->write=%tx\n", rb->wr_pointer );
    printf("r...rb->read=%tx\n", rb->rd_pointer );
    printf("DATA READ ---> %s\n",(char *)buf);
    printf("r...rb->fill_count=%d\n", rb->fill_count );
    return 0;
}
4

3 回答 3

0

(作者可能会晚一点,但如果其他人搜索“多生产者单一消费者”)

我认为该实现中的基本问题是 rb_write 修改了全局状态(rb->fill_count 和其他 rb->XX),而无需在多个写入器之间进行任何同步。

有关替代想法,请查看:http ://www.linuxjournal.com/content/lock-free-multi-producer-multi-consumer-queue-ring-buffer 。

于 2014-11-23T11:56:37.327 回答
0

至于你的问题—— 1. 我也找不到那个错误——事实上,我已经尝试过你的代码并且没有看到这种行为。2. 你问这是否逻辑/方法正确——好吧,就目前而言,这确实实现了一种环形缓冲区。您的测试用例恰好是大小的整数倍,并且记录大小是恒定的,因此这不是最好的测试。

在尝试你的代码时,我发现有很多线程饥饿——第一个运行的生产者线程(最后一个创建的)非常困难,在第一次 5 次之后尝试并失败,将东西填充到缓冲区中,而不是给出消费者线程有机会运行(甚至启动)。然后,当消费者线程启动时,它会在释放 cpu 之前保持相当长的一段时间,并且下一个生产者线程最终会启动。这就是它在我的机器上的工作方式——我敢肯定,它在不同的机器上会有所不同。

太糟糕了,您当前的代码没有办法结束——创建 10 或 100 的 MB 文件......很难涉足。

于 2012-10-25T16:52:47.283 回答
0

在生产者处,您还需要等待条件变量的has empty space条件。这两个条件变量都应该无条件地发出信号,即当消费者从环形缓冲区中删除一个元素时,它应该向生产者发出信号;当生产者将某些东西放入缓冲区时,它应该向消费者发出信号。此外,我会将这个等待/信令逻辑移动到 rb_read 和 rb_write 实现中,因此您的环形缓冲区对于您的程序的其余部分来说是一个“完整的使用解决方案”。

于 2012-10-25T14:54:14.020 回答