情况
在阅读了Unix Socket Programming W.Richard Steven之后,我正在编写一个 P2P 程序,其中主线程创建线程池,其中有五个子线程。然后它用 . 监视 50 个套接字kqueue()
。当指定套接字中发生事件(例如,在套接字上接收数据)时,主线程将套接字描述符复制到共享数组中并唤醒线程池中的一个线程。然后子线程处理来自套接字的请求。此外,我使用互斥变量和条件变量保护了共享数组。
问题
作者分别在书中30.12节和30.13节给出了源代码“server/serv08.c”和“server/pthread08.c”,好像这段代码没有问题一样。但是,当我编写了一个类似于在场的一位作者的代码片段时,线程同步不能很好地工作。为什么在主线程 中iput
变得等于?iget
代码
——全局变量——
typedef struct tagThread_information
{
int sockfd;
} Thread_information;
Thread_information peer_fds[MAX_THREAD];
pthread_mutex_t peerfd_mutex;
pthread_cond_t peerfd_cond;
pthread_mutex_t STDOUT_mutex;
int iput;
int iget;
--主线程--
void Wait_for_Handshake(download_session *pSession, int nMaxPeers)
{
struct kevent ev[50], result[50];
int kq, i, nfd;
int c = 1;
if( (kq = kqueue()) == -1)
{
fprintf(stderr, "fail to initialize kqueue.\n");
exit(0);
}
for(i = 0 ; i < nMaxPeers; i++)
{
EV_SET(&ev[i], pSession->Peers[i].sockfd, EVFILT_READ, EV_ADD, 0, 0, 0);
printf("socket : %d\n", (int)ev[i].ident);
}
// create thread pool. initialize mutex and conditional variable.
iput = 0;
iget = 0;
pthread_mutex_init(&STDOUT_mutex, NULL);
pthread_mutex_init(&peerfd_mutex, NULL);
pthread_cond_init(&peerfd_cond, NULL);
// Assume that MAX_THREAD is set to 5.
for(i = 0 ; i < MAX_THREAD; i++)
thread_make(i);
while(1)
{
nfd = kevent(kq, ev, nMaxPeers, result, nMaxPeers, NULL);
if(nfd == -1)
{
fprintf(stderr, "fail to monitor kqueue. error : %d\n", errno);
nMaxPeers = Update_peer(ev, pSession->nPeers);
pSession->nPeers = nMaxPeers;
continue;
}
for(i = 0 ; i < nfd; i++)
{
pthread_mutex_lock(&peerfd_mutex);
peer_fds[iput].sockfd = (int)result[i].ident;
if( ++iput == MAX_THREAD)
iput = 0;
if(iput == iget) // Here is my question.
{
exit(0);
}
pthread_cond_signal(&peerfd_cond);
pthread_mutex_unlock(&peerfd_mutex);
}
}
}
--子线程--
void * thread_main(void *arg)
{
int connfd, nbytes;
char buf[2048];
for( ; ; )
{
/* get socket descriptor */
pthread_mutex_lock(&peerfd_mutex);
while( iget == iput)
pthread_cond_wait(&peerfd_cond, &peerfd_mutex);
connfd = peer_fds[iget].sockfd;
if ( ++iget == MAX_THREAD )
iget = 0;
pthread_mutex_unlock(&peerfd_mutex);
/* process a request on socket descriptor. */
nbytes = (int)read(connfd, buf, 2048);
if(nbytes == 0)
{
pthread_mutex_lock(&STDOUT_mutex);
printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
printf("socket closed\n\n");
pthread_mutex_unlock(&STDOUT_mutex);
close(connfd);
continue;
}
else if(nbytes == -1)
{
close(connfd);
pthread_mutex_lock(&STDOUT_mutex);
printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
perror("socket error : ");
write(STDOUT_FILENO, buf, nbytes);
printf("\n\n\n\n");
pthread_mutex_unlock(&STDOUT_mutex);
continue;
}
pthread_mutex_lock(&STDOUT_mutex);
printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
write(STDOUT_FILENO, buf, nbytes);
printf("\n\n\n\n");
pthread_mutex_unlock(&STDOUT_mutex);
}
}