5

我有一个程序,我试图在其中实现多生产者、多消费者设置。当我有一个消费者和多个生产者时,我的代码似乎运行良好,但引入多个消费者线程似乎会引发一些奇怪的问题。

这是我现在拥有的:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define MAX 10

typedef struct travs {
    int id;
    int numBags;
    int arrTime;
    struct travs *next;
} travs;


travs *queue;
//travs *servicing[MAX];

int produced; // The total # of produced in the queue

pthread_mutex_t queue_lock;
//pthread_mutex_t staff_lock;
pthread_cond_t ct, cs;

int CheckIn(){
    sleep(1);
    if(produced != 0) return 1;
    else return 0;
}



void *producerThread(void *args){
    travs *traveler = (travs *)args;
    // Acquire the mutex
    pthread_mutex_lock(&queue_lock);
    produced++;
//  pthread_cond_signal(&cs);
    pthread_cond_wait(&ct, &queue_lock);
    printf("Producer %d is now checked in at time %d.\n", queue->id, (1+queue-    >arrTime));
    queue = queue->next;
    pthread_mutex_unlock(&queue_lock);

    return; 
}       

int Producer(int id, int numBags, int arrTime){

    int ret;
    pthread_t ttid;
    travs *traveler = malloc(sizeof(travs));
    traveler->id = id;
    traveler->numBags = numBags;
    traveler->arrTime = arrTime;
    sleep(arrTime); 
    pthread_mutex_lock(&queue_lock);
    if(queue != NULL) {
        travs *check_in = malloc(sizeof(travs));
        check_in = queue;
        while(check_in->next != NULL){
            check_in = check_in->next;
        }
        check_in->next = traveler;
    }
    else { queue = traveler; }
    pthread_mutex_unlock(&queue_lock);
    // Create a new traveler thread
    ret = pthread_create(&ttid, NULL, producerThread, (void *)traveler);

    // Check if thread creation was successful
    if(ret == 0) {
        printf("Producer %d has entered the check-in line at time %d; s/he is at     position %d and has %d bags.\n", id, arrTime, produced, numBags);
        pthread_cond_signal(&cs);
        return 0;
    }
    else return -1;

}


void *consumerThread(void *arg){

    int i = 0; // travelers serviced
    char *name = (char *)arg;
    while(1) { // run iteratively

        // If 20 producers have been served, the consumer's work is done.
        if(i == 20) {
            printf("Consumer %s's service has completed!\n", name);
                pthread_exit(NULL);
            }
        // Sleep for 10s if 5 travelers have been checked in
        if (((i+1) % 5) == 0) {
                // Wake up sleeping travelers
                printf("Consumer %s is taking a break.\n", name);
                sleep(2);
                printf("Consumer %s's break is over.\n", name);
        }

        if(CheckIn()) {
            pthread_mutex_lock(&queue_lock);
            int j = 1;
                    pthread_cond_wait(&cs, &queue_lock);
                    printf("Producer %d presents ticket to consumer     %s.\n", queue->id, name);
                    printf("Consumer %s gives boarding pass to producer     %d.\n", name, queue->id);
                    while(j <= queue->numBags){
                        printf("Consumer %s checks in bag %d for     producer %d; baggage tag is _X_.\n", name, j, queue->id);
                        j++;
                }
            // Signal producer being serviced that their check in is complete.
            i++;
            pthread_mutex_unlock(&queue_lock);
            produced--;
            pthread_cond_signal(&ct);
        }
    sleep(3);
    }
}

int Consumer(char *Name) {

    sleep(5);
    int ret;
    pthread_t stid;
    // Create a staff thread

    ret = pthread_create(&stid, NULL, consumerThread, (void *)Name);
    // Acquire the lock
    if(ret == 0) { 
        printf("Producer %s's service has begun!\n", Name);
        return 0;
    }
    else    return -1;
}

int main() {
    int ret = 0;
    char *staff_name = malloc(sizeof(char));
    int staff_check = 0;
    int trav_check = 0;
    int id;
    int bagnum;
    int travtime;
    FILE *consumer_fp;
    FILE *producer_fp;
    queue = malloc(sizeof(travs));
    queue = NULL;
    /*while(ret < 10){
        servicing[ret] = malloc(sizeof(travs));
        servicing[ret] = NULL;
    }*/

    // Initilize mutexes
    pthread_mutex_init(&queue_lock, NULL);
    //pthread_mutex_init(&staff_lock, NULL);

    // Initialize condition variables
    pthread_cond_init(&ct, NULL);
    pthread_cond_init(&cs, NULL);

    // Open the file so we can start reading from it

    consumer_fp = fopen("staff.txt", "r");
    producer_fp = fopen("travelers.txt", "r");

    staff_check = fscanf(consumer_fp, "%s", staff_name);
    trav_check = fscanf(producer_fp, "%d %d %d", &id, &bagnum, &travtime);
    while(1){   

        K:
        while(staff_check == 1) {
            Consumer(staff_name);
            staff_check = fscanf(consumer_fp, "%s", staff_name);
            goto L;
        }
        L:
        while(trav_check == 3) { 
            Producer(id, bagnum, travtime);
            trav_check = fscanf(producer_fp, "%d %d %d", &id, &bagnum,     &travtime);
            goto K;
        }

    pthread_exit(NULL);
    }

}

在这种情况下,每个生产者线程在返回之前只存在很短的时间,并且除了向全局队列中添加一个新元素和一些适当定时的输出行之外,它本身并不进行真正的计算。

但是,当我介绍多个生产者时,只有最后一个生产者线程会做任何事情。

据我推测,我需要以下内容:

i) 等待签入的生产者和当前正在签入的生产者的单独队列(上面注释为 travs *servicing[MAX])

ii) 用于消费者的单独互斥锁。

但是,我不确定如何实现这一点。这是我的想法:

  1. CheckIn() 生产者线程并将 *queue 复制到 *servicing[i] (在消费者线程中)。

  2. 设置 queue = queue->next(在生产者线程中)。

但是,我如何保证当我复制 *queue 时它不会已经前进了一步?我可以用与线程当前持有的锁不同的锁向等待线程发出信号吗?而且,更重要的是,我将如何让不同的消费者线程处理不同的旅行者线程?

任何帮助将不胜感激!

4

2 回答 2

4

正如我所提到的,这是一个内存泄漏:

travs *traveler = malloc(sizeof(travs));
traveler = (travs *)args;

我不打算详细介绍“内存泄漏有什么不好?”。如果你想得到那个答案,就问谷歌这个问题。您可能的意思是:travs *traveler = args;.


if(queue != NULL) {
    travs *check_in = malloc(sizeof(travs));
    check_in = queue;
    while(check_in->next != NULL){
        check_in = check_in->next;
    }
    check_in->next = traveler;
}
else { queue = traveler; }

抛开内存泄漏不谈,为什么之前在其他函数中对队列互斥量进行了保护,而在这段代码中根本没有对互斥量进行保护?似乎您错过了互斥锁的要点。你的代码竞赛,在这里。

也许pthread_rwlock_ts 会更适合这种代码。

于 2013-04-19T03:34:10.203 回答
4

使用一个队列。

编写两个函数,一个将现有项目添加到队列中,一个从队列中删除一个项目。不要在这些函数中使用任何锁定。在单线程应用程序中测试它们。

然后为这两个添加和删除函数编写两个包装器。这些包装器应该将额外的互斥锁作为参数。在调用添加或删除函数之前将这个互斥锁锁定在包装器中,然后解锁互斥锁。

编写生产者线程函数,创建一个新项目并调用 add-item-wrapper。编写调用 remove-item-wrapper 的消费者线程函数并销毁已删除的项目。

设置main()声明和初始化互斥锁的函数,然后使用pthread_create(). 将互斥锁作为参数传递给线程函数。

于 2013-04-19T06:14:43.213 回答