0
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <wait.h>
#include <pthread.h>

int item_to_produce, curr_buf_size;
int total_items, max_buf_size, num_workers, num_masters;
int consumed_items;
int *buffer;
pthread_mutex_t mutex;
pthread_cond_t  has_data;
pthread_cond_t  has_space;



void print_produced(int num, int master) {

  printf("Produced %d by master %d\n", num, master);

}

void print_consumed(int num, int worker) {

  printf("Consumed %d by worker %d\n", num, worker);

}

//consume items in buffer
void *consume_requests_loop(void *data)
{
  int thread_id = *((int *)data);

  while(1)
  {
    pthread_mutex_lock(&mutex); // mutex lock for consume
    if(consumed_items == total_items) {
       pthread_mutex_unlock(&mutex);
       break;
     }     

    while(curr_buf_size == 0) {
      pthread_cond_wait(&has_data, &mutex);
    }

    print_consumed(buffer[(curr_buf_size--)-1], thread_id);
    consumed_items++;

    pthread_cond_signal(&has_space);
    pthread_mutex_unlock(&mutex);
  }

  return 0;
}


//produce items and place in buffer
//modify code below to synchronize correctly
void *generate_requests_loop(void *data)
{
  int thread_id = *((int *)data);

  while(1) {

      pthread_mutex_lock(&mutex); // mutex lock for consume

      //all of items are produced
      //master threads need to join
      if(item_to_produce == total_items) {
        pthread_mutex_unlock(&mutex);
        break;
      }

      //there is no item to read
      while (curr_buf_size == max_buf_size) {

        pthread_cond_wait(&has_space, &mutex);

      }

      buffer[curr_buf_size++] = item_to_produce;
      print_produced(item_to_produce, thread_id);
      item_to_produce++;
      pthread_cond_signal(&has_data);
      pthread_mutex_unlock(&mutex); // mutex_produce unlock
    }

  return 0;
}



//write function to be run by worker threads
//ensure that the workers call the function print_consumed when they consume an item
int main(int argc, char *argv[])
{
  int *master_thread_id; // array of master_thread_id
  int *worker_thread_id; // array of worker_thread_id  
  pthread_t *master_thread; // array of master_thread
  pthread_t *worker_thread; // array of worker_thread

  item_to_produce = 0; // item will be produced by master_thread at next time
  curr_buf_size = 0; // index of item will be saved in
  consumed_items = 0;
  
  int i;  

   if (argc < 5) {

    printf("./master-worker #total_items #max_buf_size #num_workers #masters e.g. ./exe 10000 1000 4 3\n");
    exit(1);
  }
  else {

    num_masters = atoi(argv[4]);
    num_workers = atoi(argv[3]);
    total_items = atoi(argv[1]);
    max_buf_size = atoi(argv[2]);
  }

   buffer = (int *)malloc (sizeof(int) * max_buf_size);

   pthread_mutex_init(&mutex, NULL);
   pthread_cond_init(&has_space, NULL);
   pthread_cond_init(&has_data, NULL);

   //create master producer threads

   master_thread_id = (int *)malloc(sizeof(int) * num_masters);
   master_thread = (pthread_t *)malloc(sizeof(pthread_t) * num_masters);

  for (i = 0; i < num_masters; i++)
    master_thread_id[i] = i;

  for (i = 0; i < num_masters; i++)
    pthread_create(&master_thread[i], NULL, generate_requests_loop, (void *)&master_thread_id[i]);

  //create worker consumer threads
  worker_thread_id = (int *)malloc(sizeof(int) * num_workers);
  worker_thread = (pthread_t *)malloc(sizeof(pthread_t) * num_workers);

  for (i = 0; i < num_workers; i++)
    worker_thread_id[i] = i;

  for (i = 0 ; i < num_workers; i++)
    pthread_create(&worker_thread[i], NULL, consume_requests_loop, (void *)&worker_thread_id[i]);

  

  //wait for all threads to complete
  for (i = 0; i < num_masters; i++)
    {
      pthread_join(master_thread[i], NULL);
      printf("master %d joined\n", i);
    }

  for (i = 0; i < num_workers; i++)
    {
      pthread_join(worker_thread[i], NULL);
      printf("worker %d joined\n", i);
    }  

  /*----Deallocating Buffers---------------------*/

  free(buffer);
  free(master_thread_id);
  free(master_thread);
  free(worker_thread_id);
  free(worker_thread);
  pthread_mutex_destroy(&mutex);
  pthread_cond_destroy(&has_data);
  pthread_cond_destroy(&has_space);

  return 0;

}

此代码通过参数生成给定数字范围内的数字并使用它。

但是生产者会产生一个超出范围的数字,如果它符合条件则不会加入。消费者也是。

例如,当我给出 0~39(total_item = 500)、buff 大小 30(max_buf_size)、num_workers 5、num_master 3 之类的数字范围时,它不会仅产生和消耗 0~39 的数字。

它生产和消费超过 40 个数字。

4

1 回答 1

0

这样,线程处于循环中。要将线程置于睡眠状态,您可以使用例如条件变量。(您可以阅读此内容以获取更多信息https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_wait.html

于 2020-11-29T00:44:59.597 回答