0

我做了一个程序,它有几个线程客户端,可以将信息存储到循环缓冲区中;服务器可以从循环缓冲区中读取(和“删除”)消息。我遇到的问题是缓冲区已满;当前正在写入缓冲区的客户端必须释放互斥锁(服务器会从缓冲区中删除所有数据),但是当释放互斥锁时,客户端竞争和已经很忙的客户端不一定是第一个继续传输的客户端;所以他的信息被混淆了。有谁知道如何解决这个问题?或者你有更好的方法来做这件事的建议。最终,我想创建一个多线程 tcp 服务器,将消息存储在缓冲区中,然后由服务器重新传输给所有客户端,这样你就知道我要用它做什么了。

#include <stdio.h>
#include <malloc.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

struct cBuf{
    int     size;   /* maximum number of slots */
    int     start;  /* index of oldest element (last element) */
    int     end;    /* index at which to write new element (first element) */
    char    *buf;  /* buffer array */
};

struct cBuf cb;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

/* Initialize buffer*/
void buf_Init(struct cBuf *cb, int size) {
    cb->size  = size + 1; /* empty element */
    cb->start = 0;
    cb->end   = 0; 
    cb->buf = (char *)calloc(cb->size, sizeof(char)); //pointer to beginning of allocated memory
}
/* Clean up buffer*/
void buf_Free(struct cBuf *cb) {
    free(cb->buf);
}

/* Is the the buffer full? if it equals size meaning the buffer is full: end will point to zero (ex. 7%7 is 0) */ 
int buf_IsFull(struct cBuf *cb) {
    return (cb->end + 1) % cb->size == cb->start; 
}

/* Is the buffer empty? Yes if both start & end are zero (are equal) */
int cbIsEmpty(struct cBuf *cb) {
    return cb->end == cb->start; 
}

/* Write an element, overwriting oldest element if buffer is full. App can
   choose to avoid the overwrite by checking cbIsFull(). */
/* When inserting an element acces needs to be blocked (only 1 insert at a time =>mutexes 
 * When buffer is full it should return -1 so we can block the request*/
int buf_Insert(struct cBuf *cb, char *elem) {
    if (buf_IsFull(cb)){
        //buffer is full; cannot write into the buffer
        //sleep(2);
        return 0;
    } else {
        /* When writing to the buffer, the buffer has to be locked first (will block when already locked)*/
        pthread_mutex_lock(&mutex);
        cb->buf[cb->end] = *elem; //we put the new character in the last free slot
        /* we move the index for the next slot one further */
        cb->end = (cb->end + 1) % cb->size; 
        pthread_mutex_unlock(&mutex);
        return 1;
    }
}

/* Read oldest element. Ensure !cbIsEmpty() first. */
int buf_Read(struct cBuf *cb, char *out) {
    if (!cbIsEmpty(cb)){
        /* When reading from the buffer, the buffer has to be locked first (will block when already locked)*/
        pthread_mutex_lock(&mutex);
        *out = cb->buf[cb->start];
        cb->start = (cb->start + 1) % cb->size; //are news oldest character is now the next one
        pthread_mutex_unlock(&mutex);
        return 1;
    }
    //buffer is emtpy
    return 0;
}

void * client(void *cb){
    //sleep (2);
    #ifdef DEBUG
        printf("\nDEBUG (Thread ID:%lu) - Entering thread\t",(unsigned long)pthread_self());
    #endif

    /* main thread does not need to wait and this way all resources are released when terminating*/
    pthread_detach(pthread_self());

    struct cBuf *myData;
    myData = (struct cBuf*) cb;
    int i;

    /* client will post message in the buffer */
    char input[]="Hello World!";

    //printf("I:\t");
    for (i=0; i < sizeof(input); ++ i){ 
        if (buf_Insert(myData, &input[i]) ){
            //printf("[I]%c",input[i]);
        } else {
            //the buffer is full; wait until it has been emptied + go back a step because of lost character
            i--;    
            #ifdef DEBUG
            printf("\nDEBUG (Main Thread) - Full Buffer");  
            #endif
        }
    }
    return 0;
}

int main(void) {
    int bufferSize = 20; /* arbitrary size */
    char out;
    pthread_t thread;
    int j;

    buf_Init(&cb, bufferSize);

    //we make 1 client for starters
    for (j = 0; j<4; j++){
        if(pthread_create (&thread,NULL, client, (void *) &cb) !=0){
            #ifdef DEBUG
            printf("\nDEBUG (Main Thread) - Error while creating thread");
            #endif
        } else {
            #ifdef DEBUG
            printf("\nDEBUG (Main Thread) - Thread created");
            #endif
        }
    }

    while (1){
        /* Server will read (and thus remove) the message */
        if (cbIsEmpty(&cb)){
            //sleep(2);
            #ifdef DEBUG
            printf("\nDEBUG (Main Thread) - Buffer is empty");
            #endif
        } else {
            printf("\tO:\t");
            while (buf_Read(&cb,&out)){
                printf("%c", out);
            }
            #ifdef DEBUG        
            if (buf_Read(&cb, &out) == 0){
                printf("\nDEBUG (Main Thread) - Finished reading");
            }
            #endif
            printf("\n");   
        }
    }

    //empty the buffer; free the allocated memory
    buf_Free(&cb);
    return 0;
}
4

1 回答 1

0

您所描述的是一个典型的用例,除了有选择地发出信号(和检查)线程pthread_cond_t之外,您还可以使用它。pthread_mutex_t

于 2012-05-27T17:40:53.243 回答