我正在尝试学习如何使用Douglas Niehaus 的pthreads
课程。
现在我正处于生产者-消费者问题的中间。这是我到目前为止达到的解决方案:
生产商:
void *producer (void *parg) {
queue *fifo;
int item_produced;
pcdata *mydata;
int my_tid;
int *total_produced;
mydata = (pcdata *) parg;
fifo = mydata->q;
total_produced = mydata->count;
my_tid = mydata->tid;
/*
* Continue producing until the total produced reaches the
* configured maximum
*/
while (1) {
/*
* Do work to produce an item. Tthe get a slot in the queue for
* it. Finally, at the end of the loop, outside the critical
* section, announce that we produced it.
*/
do_work(PRODUCER_CPU, PRODUCER_BLOCK);
pthread_mutex_lock(fifo->mutex);
/*
* If the queue is full, we have no place to put anything we
* produce, so wait until it is not full.
*/
while(fifo->full && *total_produced != WORK_MAX) {
pthread_cond_wait(fifo->notFull,fifo->mutex);
}
if(*total_produced >= WORK_MAX) {
printf("PRODUCER %d is exitting because the production reached the MAX\n",my_tid);
pthread_cond_signal(fifo->notFull);
pthread_mutex_unlock(fifo->mutex);
break;
}
/*
* OK, so we produce an item. Increment the counter of total
* widgets produced, and add the new widget ID, its number, to the
* queue.
*/
item_produced = (*total_produced)++;
queueAdd (fifo, item_produced);
pthread_cond_signal(fifo->notEmpty);
pthread_mutex_unlock(fifo->mutex);
/*
* Announce the production outside the critical section
*/
printf("prod %d:\t %d.\n", my_tid, item_produced);
}
printf("prod %d:\texited\n", my_tid);
return (NULL);
}
消费者:
void *consumer (void *carg)
{
queue *fifo;
int item_consumed;
pcdata *mydata;
int my_tid;
int *total_consumed;
mydata = (pcdata *) carg;
fifo = mydata->q;
total_consumed = mydata->count;
my_tid = mydata->tid;
/*
* Continue producing until the total consumed by all consumers
* reaches the configured maximum
*/
while (1) {
/*
* If the queue is empty, there is nothing to do, so wait until it
* si not empty.
*/
pthread_mutex_lock(fifo->mutex);
while(fifo->empty && *total_consumed != WORK_MAX) {
pthread_cond_wait(fifo->notEmpty,fifo->mutex);
}
if(*total_consumed >= WORK_MAX) {
printf("CONSUMER %d is exitting because the compsuption reached the MAX\n",my_tid);
pthread_cond_signal(fifo->notEmpty);
pthread_mutex_unlock(fifo->mutex);
break;
}
/*
* Remove the next item from the queue. Increment the count of the
* total consumed. Note that item_consumed is a local copy so this
* thread can retain a memory of which item it consumed even if
* others are busy consuming them.
*/
queueRemove (fifo, &item_consumed);
(*total_consumed)++;
pthread_cond_signal(fifo->notFull);
pthread_mutex_unlock(fifo->mutex);
/*
* Do work outside the critical region to consume the item
* obtained from the queue and then announce its consumption.
*/
do_work(CONSUMER_CPU,CONSUMER_CPU);
printf ("con %d:\t %d.\n", my_tid, item_consumed);
}
printf("con %d:\texited\n", my_tid);
return (NULL);
}
这似乎适用于相当少数的生产者和消费者,例如 100 个消费者和 50 个生产者,反之亦然。但是,如果我尝试使用超过 2000 个生产者和/或消费者的代码,执行就会挂起。我想我已经陷入僵局,但我找不到它。
任何帮助,将不胜感激。
笔记:
内部的信号if-block
:
if(*total_consumed >= WORK_MAX) { ... }
和if-block
:
if(*total_consumed >= WORK_MAX) {...}
是我添加的东西,因为没有它们的执行会挂断超过 5 个生产者和/或消费者。
我的理由是,如果已达到工作限制,则应告知其他生产者,以便他们也可以退出。消费者也是如此。
笔记2:
以下代码已提供给学生:
typedef struct {
int buf[QUEUESIZE]; /* Array for Queue contents, managed as circular queue */
int head; /* Index of the queue head */
int tail; /* Index of the queue tail, the next empty slot */
int full; /* Flag set when queue is full */
int empty; /* Flag set when queue is empty */
pthread_mutex_t *mutex; /* Mutex protecting this Queue's data */
pthread_cond_t *notFull; /* Used by producers to await room to produce*/
pthread_cond_t *notEmpty; /* Used by consumers to await something to consume*/
} queue;
typedef struct {
queue *q;
int *count;
int tid;
} pcdata;
/******************************************************/
queue *queueInit (void) {
queue *q;
/*
* Allocate the structure that holds all queue information
*/
q = (queue *)malloc (sizeof (queue));
if (q == NULL) return (NULL);
/*
* Initialize the state variables. See the definition of the Queue
* structure for the definition of each.
*/
q->empty = 1;
q->full = 0;
q->head = 0;
q->tail = 0;
/*
* Allocate and initialize the queue mutex
*/
q->mutex = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
pthread_mutex_init (q->mutex, NULL);
/*
* Allocate and initialize the notFull and notEmpty condition
* variables
*/
q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
pthread_cond_init (q->notFull, NULL);
q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
pthread_cond_init (q->notEmpty, NULL);
return (q);
}
而在main:
int main (int argc, char *argv[]) {
// More initializations
pthread_t *con;
int cons;
int *concount;
queue *fifo;
int i;
pthread_t *pro;
int *procount;
int pros;
pcdata *thread_args;
fifo = queueInit ();
if (fifo == NULL) {
fprintf (stderr, "main: Queue Init failed.\n");
exit (1);
}
// More initializations and creation of the threads:
for (i=0; i<pros; i++){
/*
* Allocate memory for each producer's arguments
*/
thread_args = (pcdata *)malloc (sizeof (pcdata));
if (thread_args == NULL) {
fprintf (stderr, "main: Thread_Args Init failed.\n");
exit (1);
}
/*
* Fill them in and then create the producer thread
*/
thread_args->q = fifo;
thread_args->count = procount;
thread_args->tid = i;
pthread_create (&pro[i], NULL, producer, thread_args);
}
}
}