我在Linux环境下使用C编程语言实现了一个线程池。有一个阻塞任务队列,我们可以把任务放到任务队列中,线程池中的线程从任务队列中获取任务,如果任务队列为空,它缠绕块等待表示任务队列不为空的条件信号。线程池中的线程运行的函数是:
static void *do_worker(void *arg)
{
printf("new thread %d\n", pthread_self());
ThreadPool *pool = (ThreadPool *)arg;
while (1) {
printf("thread %d try to get new task\n", pthread_self());
Task task;
//printf("before get_task\n");
task = get_task(&pool->task_queue);
printf("thread %d get_task\n", pthread_self());
if (task.ftn == NULL ) {
break;
}
task.ftn(task.arg);
//printf("finish new task\n");
//printf("I'm thread %d\n", pthread_self());
}
printf("thread %d exit\n", pthread_self());
pthread_exit(NULL);
}
当我将普通任务添加到队列时似乎很好,但是当我尝试使用毒任务破坏线程池时,就像这样:
void destroy_threadpool(ThreadPool *pool)
{
int i;
/* put poison to let the threads exit */
for (i = 0; i < pool->t_cnt; i++) {
Task null_task;
null_task.ftn = NULL;
null_task.arg = NULL;
printf("insert poison task\n");
insert_task(pool, null_task);
}
for (i = 0; i < pool->t_cnt; i++) {
pthread_join(pool->tids[i], NULL);
}
}
它不起作用。当我测试这个实现时,我首先让线程池中的线程在等待非空条件时阻塞,然后调用destroy_threadpool函数。但是,有的线程可以退出,有的不能,仍然阻塞在等待条件中,导致主线程在pthread_join中阻塞。什么是问题?
get_task 和 put_task 的代码是:
Task get_task(TaskQueue *queue)
{
Task task;
pthread_mutex_lock(&queue->mutex);
while (queue->front == queue->back) {
printf("thread %d blocks!\n", pthread_self());
pthread_cond_wait(&queue->no_empty, &queue->mutex);
}
/* get the front task in the queue */
memcpy(&task, &queue->tasks[queue->front], sizeof(Task));
if (++queue->front == MAXTASK) {
queue->front = 0;
}
pthread_cond_signal(&queue->no_full);
pthread_mutex_unlock(&queue->mutex);
return task;
}
void put_task(TaskQueue *queue, Task task)
{
pthread_mutex_lock(&queue->mutex);
while (((queue->back + 1) == queue->front) || (queue->back - queue->front == MAXTASK - 1)) {
pthread_cond_wait(&queue->no_full, &queue->mutex);
}
memcpy(&queue->tasks[queue->back], &task, sizeof(task));
if (++queue->back == MAXTASK) {
queue->back = 0;
}
pthread_cond_signal(&queue->no_empty);
pthread_mutex_unlock(&queue->mutex);
}
ThreadPool和TaskQueue的定义是:
typedef struct {
void *(*ftn)(void *arg);
void *arg;
} Task;
typedef struct {
Task tasks[MAXTASK];
int front;
int back;
pthread_mutex_t mutex;
pthread_cond_t no_empty;
pthread_cond_t no_full;
} TaskQueue;
typedef struct {
pthread_t tids[MAXTHREAD];
int t_cnt;
TaskQueue task_queue;
} ThreadPool;