0

我正在尝试学习如何使用Douglas Niehaus 的pthreads课程。

现在我正处于生产者-消费者问题的中间。这是我到目前为止达到的解决方案:

生产商:

void *producer (void *parg) {
  queue  *fifo;
  int     item_produced;
  pcdata *mydata;
  int     my_tid;
  int    *total_produced;

  mydata = (pcdata *) parg;

  fifo           = mydata->q;
  total_produced = mydata->count;
  my_tid         = mydata->tid;

  /*
  * Continue producing until the total produced reaches the
  * configured maximum
  */
  while (1) {
    /*
    * Do work to produce an item. Tthe get a slot in the queue for
    * it. Finally, at the end of the loop, outside the critical
    * section, announce that we produced it.
    */
    do_work(PRODUCER_CPU, PRODUCER_BLOCK);

    pthread_mutex_lock(fifo->mutex);
    /*
    * If the queue is full, we have no place to put anything we
    * produce, so wait until it is not full.
    */

    while(fifo->full && *total_produced != WORK_MAX) {
      pthread_cond_wait(fifo->notFull,fifo->mutex);
    }

    if(*total_produced >= WORK_MAX) {
      printf("PRODUCER %d is exitting because the production reached the MAX\n",my_tid);
      pthread_cond_signal(fifo->notFull);
      pthread_mutex_unlock(fifo->mutex);
      break;
    }

    /*
    * OK, so we produce an item. Increment the counter of total
    * widgets produced, and add the new widget ID, its number, to the
    * queue.
    */
    item_produced = (*total_produced)++;
    queueAdd (fifo, item_produced);

    pthread_cond_signal(fifo->notEmpty);
    pthread_mutex_unlock(fifo->mutex);

    /*
    * Announce the production outside the critical section 
    */
    printf("prod %d:\t %d.\n", my_tid, item_produced);

 }

printf("prod %d:\texited\n", my_tid);
return (NULL);
}

消费者:

    void *consumer (void *carg)
    {
      queue  *fifo;
      int     item_consumed;
      pcdata *mydata;
      int     my_tid;
      int    *total_consumed;

      mydata = (pcdata *) carg;

      fifo           = mydata->q;
      total_consumed = mydata->count;
      my_tid         = mydata->tid;

      /*
       * Continue producing until the total consumed by all consumers
       * reaches the configured maximum
       */
      while (1) {
        /*
         * If the queue is empty, there is nothing to do, so wait until it
         * si not empty.
         */
        pthread_mutex_lock(fifo->mutex);

        while(fifo->empty && *total_consumed != WORK_MAX) {
          pthread_cond_wait(fifo->notEmpty,fifo->mutex);
        }

        if(*total_consumed >= WORK_MAX) {
          printf("CONSUMER %d is exitting because the compsuption reached the MAX\n",my_tid);
          pthread_cond_signal(fifo->notEmpty);
          pthread_mutex_unlock(fifo->mutex);
          break;
        }

        /*
         * Remove the next item from the queue. Increment the count of the
         * total consumed. Note that item_consumed is a local copy so this
         * thread can retain a memory of which item it consumed even if
         * others are busy consuming them. 
         */
          queueRemove (fifo, &item_consumed);
          (*total_consumed)++;
          pthread_cond_signal(fifo->notFull);
          pthread_mutex_unlock(fifo->mutex);


        /*
         * Do work outside the critical region to consume the item
         * obtained from the queue and then announce its consumption.
         */
        do_work(CONSUMER_CPU,CONSUMER_CPU);
        printf ("con %d:\t %d.\n", my_tid, item_consumed);

      }

      printf("con %d:\texited\n", my_tid);
      return (NULL);
    }

这似乎适用于相当少数的生产者和消费者,例如 100 个消费者和 50 个生产者,反之亦然。但是,如果我尝试使用超过 2000 个生产者和/或消费者的代码,执行就会挂起。我想我已经陷入僵局,但我找不到它。

任何帮助,将不胜感激。

笔记:

内部的信号if-block

if(*total_consumed >= WORK_MAX) { ... }

if-block

if(*total_consumed >= WORK_MAX) {...}

是我添加的东西,因为没有它们的执行会挂断超过 5 个生产者和/或消费者。

我的理由是,如果已达到工作限制,则应告知其他生产者,以便他们也可以退出。消费者也是如此。

笔记2:

以下代码已提供给学生:

typedef struct {
  int buf[QUEUESIZE];   /* Array for Queue contents, managed as circular queue */
  int head;             /* Index of the queue head */
  int tail;             /* Index of the queue tail, the next empty slot */  

  int full;             /* Flag set when queue is full  */
  int empty;            /* Flag set when queue is empty */

  pthread_mutex_t *mutex;     /* Mutex protecting this Queue's data */
  pthread_cond_t  *notFull;   /* Used by producers to await room to produce*/
  pthread_cond_t  *notEmpty;  /* Used by consumers to await something to consume*/
} queue;

typedef struct {
  queue *q;       
  int   *count;   
  int    tid;
} pcdata;

/******************************************************/

queue *queueInit (void) {
  queue *q;

 /*
 * Allocate the structure that holds all queue information
 */
q = (queue *)malloc (sizeof (queue));
if (q == NULL) return (NULL);

/*
 * Initialize the state variables. See the definition of the Queue
 * structure for the definition of each.
 */
 q->empty = 1;  
 q->full  = 0;   

 q->head  = 0;   
 q->tail  = 0;   

 /*
  *   Allocate and initialize the queue mutex
  */
  q->mutex = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
  pthread_mutex_init (q->mutex, NULL);

  /*
   * Allocate and initialize the notFull and notEmpty condition
   * variables
   */
  q->notFull = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
  pthread_cond_init (q->notFull, NULL);

  q->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
  pthread_cond_init (q->notEmpty, NULL);

  return (q);
}

而在main:

int main (int argc, char *argv[]) {
  // More initializations
  pthread_t *con;
  int        cons;
  int       *concount;

  queue     *fifo;
  int        i;

  pthread_t *pro;
  int       *procount;
  int        pros;

  pcdata    *thread_args;
  fifo = queueInit ();
  if (fifo ==  NULL) {
    fprintf (stderr, "main: Queue Init failed.\n");
    exit (1);
  }
  // More initializations and creation of the threads:

  for (i=0; i<pros; i++){ 
    /*
    * Allocate memory for each producer's arguments
    */
    thread_args = (pcdata *)malloc (sizeof (pcdata));
    if (thread_args == NULL) {
      fprintf (stderr, "main: Thread_Args Init failed.\n");
      exit (1);
    }

    /*
    * Fill them in and then create the producer thread
    */
    thread_args->q     = fifo;
    thread_args->count = procount;
    thread_args->tid   = i;
    pthread_create (&pro[i], NULL, producer, thread_args);
  }
}

}

4

0 回答 0