最近我正在研究 pthread 多线程库并做一些例子。
我尝试编写一个生产者-客户模块:有一个队列来存储生产者的产品,并且可以由客户获取。
我将队列 MAX-SIZE 设置为 20。当队列已满时,Producer 线程将等待,直到 Customer 线程消耗 1 并且他可以开始生产的 Producer 线程。和客户一样,当队列为空时,客户会等到生产者线程产生新的并通知他。:-)
我将客户线程的消耗设置为比生产快,它可以正常工作,因为日志输出确实符合我的预期。但是,当我设置生产者线程消耗比消耗快时,似乎最终导致了死锁:-(
我不知道原因,任何人都可以阅读我的代码并给我一些提示或如何修改代码吗?
谢谢!
#include "commons.h"
typedef struct tagNode {
struct tagNode *pNext;
char *pContent;
}NodeSt, *PNodeSt;
typedef struct {
size_t mNodeNum;
size_t mNodeIdx;
PNodeSt mRootNode;
}WorkQueue;
#define WORK_QUEUE_MAX 20
static pthread_cond_t g_adder_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t g_adder_mutex = PTHREAD_MUTEX_INITIALIZER;
static WorkQueue g_work_queue = {0};
//------------------------------------------------------------------------
void *customer_thread_runFunc(void *usrdat){
for( ; ; ) {
pthread_mutex_lock(&g_adder_mutex);{
while( g_work_queue.mNodeNum == 0 ) {
pthread_cond_wait(&g_adder_cond, &g_adder_mutex);
}
/********************** CONSUME NEW PRODUCT ***********************/
g_work_queue.mNodeNum --;
if( g_work_queue.mRootNode->pNext != NULL ) {
PNodeSt pTempNode = g_work_queue.mRootNode->pNext;
free( g_work_queue.mRootNode->pContent );
free( g_work_queue.mRootNode );
g_work_queue.mRootNode = pTempNode;
} else {
free( g_work_queue.mRootNode->pContent );
free( g_work_queue.mRootNode );
g_work_queue.mRootNode = NULL;
}
/********************** CONSUME PRODUCT END ***********************/
// Nofity Producer Thread
pthread_cond_signal(&g_adder_cond);
}pthread_mutex_unlock(&g_adder_mutex);
// PAUSE FOR 300ms
usleep(300);
}
return NULL;
}
//------------------------------------------------------------------------
void *productor_thread_runFunc( void *usrdat ) {
for( ; ; ) {
pthread_mutex_lock(&g_adder_mutex); {
char tempStr[64];
PNodeSt pNodeSt = g_work_queue.mRootNode;
while( g_work_queue.mNodeNum >= WORK_QUEUE_MAX ) {
pthread_cond_wait(&g_adder_cond, &g_adder_mutex);
}
/********************** PRODUCE NEW PRODUCT ***********************/
g_work_queue.mNodeNum ++;
g_work_queue.mNodeIdx ++;
if( pNodeSt != NULL ) {
for( ; pNodeSt->pNext != NULL; pNodeSt = pNodeSt->pNext );
pNodeSt->pNext = malloc(sizeof(NodeSt));
memset(pNodeSt->pNext, 0, sizeof(NodeSt));
sprintf( tempStr, "production id: %d", g_work_queue.mNodeIdx);
pNodeSt->pNext->pContent = strdup(tempStr);
} else {
g_work_queue.mRootNode = malloc(sizeof(NodeSt));
memset(g_work_queue.mRootNode, 0, sizeof(NodeSt));
sprintf( tempStr, "production id: %d", g_work_queue.mNodeIdx);
g_work_queue.mRootNode->pContent = strdup(tempStr);
}
/********************** PRODUCE PRODUCT END ***********************/
// Nofity Customer Thread
pthread_cond_signal(&g_adder_cond);
}pthread_mutex_unlock(&g_adder_mutex);
// PAUSE FOR 150ms, faster than Customer Thread
usleep(150);
}
return NULL;
}
//------------------------------------------------------------------------
int main(void) {
pthread_t pt1, pt3;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_create(&pt1, &attr, customer_thread_runFunc, NULL);
pthread_create(&pt3, &attr, productor_thread_runFunc, NULL);
pthread_join(pt1, NULL);
pthread_join(pt3, NULL);
printf("MAIN - main thread finish!\n");
return EXIT_SUCCESS;
}