1

我在Linux环境下使用C编程语言实现了一个线程池。有一个阻塞任务队列,我们​​可以把任务放到任务队列中,线程池中的线程从任务队列中获取任务,如果任务队列为空,它缠绕块等待表示任务队列不为空的条件信号。线程池中的线程运行的函数是:

    static void *do_worker(void *arg)
    {
        printf("new thread %d\n", pthread_self());
        ThreadPool *pool = (ThreadPool *)arg;
        while (1) {
           printf("thread %d try to get new task\n", pthread_self());
           Task task;
           //printf("before get_task\n");
           task = get_task(&pool->task_queue);
           printf("thread %d get_task\n", pthread_self());
           if (task.ftn == NULL ) {
               break;
           }
           task.ftn(task.arg);
           //printf("finish new task\n");
           //printf("I'm thread %d\n", pthread_self());
        }
        printf("thread %d exit\n", pthread_self());
        pthread_exit(NULL);
    }

当我将普通任务添加到队列时似乎很好,但是当我尝试使用毒任务破坏线程池时,就像这样:

    void destroy_threadpool(ThreadPool *pool)
    {
     int i;
     /* put poison to let the threads exit */
     for (i = 0; i < pool->t_cnt; i++) {
         Task null_task;
         null_task.ftn = NULL;
         null_task.arg = NULL;
         printf("insert poison task\n");
         insert_task(pool, null_task);
     }

     for (i = 0; i < pool->t_cnt; i++) {
         pthread_join(pool->tids[i], NULL);
     }
    }

它不起作用。当我测试这个实现时,我首先让线程池中的线程在等待非空条件时阻塞,然后调用destroy_threadpool函数。但是,有的线程可以退出,有的不能,仍然阻塞在等待条件中,导致主线程在pthread_join中阻塞。什么是问题?

get_task 和 put_task 的代码是:

    Task get_task(TaskQueue *queue)
    {
     Task task;

     pthread_mutex_lock(&queue->mutex);

     while (queue->front == queue->back) {

         printf("thread %d blocks!\n", pthread_self());
         pthread_cond_wait(&queue->no_empty, &queue->mutex);

     }

     /* get the front task in the queue */
     memcpy(&task, &queue->tasks[queue->front], sizeof(Task));
     if (++queue->front == MAXTASK) {
         queue->front = 0;
     }
     pthread_cond_signal(&queue->no_full);
     pthread_mutex_unlock(&queue->mutex);

     return task;
    }

    void put_task(TaskQueue *queue, Task task)
    {
     pthread_mutex_lock(&queue->mutex);

     while (((queue->back + 1) == queue->front) || (queue->back - queue->front == MAXTASK - 1)) {
         pthread_cond_wait(&queue->no_full, &queue->mutex);
     }

     memcpy(&queue->tasks[queue->back], &task, sizeof(task));
     if (++queue->back == MAXTASK) {
         queue->back = 0;
     }

     pthread_cond_signal(&queue->no_empty);

     pthread_mutex_unlock(&queue->mutex);

    }

ThreadPool和TaskQueue的定义是:

    typedef struct {
        void *(*ftn)(void *arg);
        void *arg;
    } Task;

    typedef struct {
        Task tasks[MAXTASK];
        int front;
        int back;
        pthread_mutex_t mutex;
        pthread_cond_t no_empty;
        pthread_cond_t no_full;
    } TaskQueue;

    typedef struct {
        pthread_t tids[MAXTHREAD];
        int t_cnt;
        TaskQueue task_queue;
    } ThreadPool;
4

0 回答 0