0

情况

在阅读了Unix Socket Programming W.Richard Steven之后,我正在编写一个 P2P 程序,其中主线程创建线程池,其中有五个子线程。然后它用 . 监视 50 个套接字kqueue()。当指定套接字中发生事件(例如,在套接字上接收数据)时,主线程将套接字描述符复制到共享数组中并唤醒线程池中的一个线程。然后子线程处理来自套接字的请求。此外,我使用互斥变量和条件变量保护了共享数组。

问题

作者分别在书中30.12节和30.13节给出了源代码“server/serv08.c”和“server/pthread08.c”,好像这段代码没有问题一样。但是,当我编写了一个类似于在场的一位作者的代码片段时,线程同步不能很好地工作。为什么在主线程 中iput变得等于?iget

代码
——全局变量——

typedef struct tagThread_information
{

    int     sockfd;

} Thread_information;
Thread_information      peer_fds[MAX_THREAD];
pthread_mutex_t         peerfd_mutex;
pthread_cond_t          peerfd_cond;
pthread_mutex_t         STDOUT_mutex;
int                     iput;
int                     iget;

--主线程--

void Wait_for_Handshake(download_session *pSession, int nMaxPeers)
{
struct kevent   ev[50], result[50];
int             kq, i, nfd;
int c = 1;

if( (kq = kqueue()) == -1)
{
    fprintf(stderr, "fail to initialize kqueue.\n");
    exit(0);
}

for(i = 0 ; i < nMaxPeers; i++)
{
    EV_SET(&ev[i], pSession->Peers[i].sockfd, EVFILT_READ, EV_ADD, 0, 0, 0);
    printf("socket : %d\n", (int)ev[i].ident);
}

// create thread pool. initialize mutex and conditional variable.
iput = 0;
iget = 0;
pthread_mutex_init(&STDOUT_mutex, NULL);
pthread_mutex_init(&peerfd_mutex, NULL);
pthread_cond_init(&peerfd_cond, NULL);

// Assume that MAX_THREAD is set to 5.
for(i = 0 ; i < MAX_THREAD; i++) 
    thread_make(i); 


while(1)    
{        
    nfd = kevent(kq, ev, nMaxPeers, result, nMaxPeers, NULL);

    if(nfd == -1)
    {
        fprintf(stderr, "fail to monitor kqueue. error : %d\n", errno);
        nMaxPeers           =   Update_peer(ev, pSession->nPeers);
        pSession->nPeers    =   nMaxPeers;
        continue;
    }

    for(i = 0 ; i < nfd; i++)
    {

        pthread_mutex_lock(&peerfd_mutex);
        peer_fds[iput].sockfd = (int)result[i].ident;
        if( ++iput == MAX_THREAD)
            iput = 0;
        if(iput == iget) // Here is my question.
        {
            exit(0);
        }

        pthread_cond_signal(&peerfd_cond);
        pthread_mutex_unlock(&peerfd_mutex);


    }

}

}

--子线程--

void * thread_main(void *arg)
{
    int     connfd, nbytes;
    char    buf[2048];
    for( ; ; )
    {
        /* get socket descriptor */
        pthread_mutex_lock(&peerfd_mutex);
        while( iget == iput)
            pthread_cond_wait(&peerfd_cond, &peerfd_mutex);
        connfd  =   peer_fds[iget].sockfd;
        if ( ++iget == MAX_THREAD )
            iget = 0;
        pthread_mutex_unlock(&peerfd_mutex);


    /* process a request on socket descriptor. */
    nbytes  =   (int)read(connfd, buf, 2048);

    if(nbytes == 0)
    {
        pthread_mutex_lock(&STDOUT_mutex);
        printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
        printf("socket closed\n\n");
        pthread_mutex_unlock(&STDOUT_mutex);
        close(connfd);
        continue;
    }
    else if(nbytes == -1)
    {
        close(connfd);
        pthread_mutex_lock(&STDOUT_mutex);
        printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
        perror("socket error : ");
        write(STDOUT_FILENO, buf, nbytes);
        printf("\n\n\n\n");
        pthread_mutex_unlock(&STDOUT_mutex);

        continue;
    }

    pthread_mutex_lock(&STDOUT_mutex);
    printf("\n\nthread %ld, socket : %d, nbytes : %d\n\n\n", (long int)pthread_self(), connfd, nbytes);
    write(STDOUT_FILENO, buf, nbytes);
    printf("\n\n\n\n");
    pthread_mutex_unlock(&STDOUT_mutex);

}
}
4

3 回答 3

0

你有一个典型的循环缓冲区实现。

当循环缓冲区为空时,头和尾指针/索引指向相同的位置。您可以在代码中看到这正在测试,while (iget == iput) ...这意味着“当队列为空时......”。

如果在循环缓冲区的头部插入后,头部指向尾部,那就是一个问题。缓冲区已溢出。这是一个问题,因为现在缓冲区现在看起来是空的,即使它已满。

即在缓冲区中保留一个未使用的位置;如果缓冲区有 4096 个条目,我们只能填充 4095。如果我们填充 4096,那么就会溢出:它看起来像一个空的循环缓冲区。

(如果我们允许索引从 0 到 8192,我们可以使用所有 4096 个位置,使用一个额外的位来解决歧义,这样指针不会在 4095 之后回绕到零,而是继续指向 4096 ... 8191。当然,我们必须记住访问以 4096 为模的数组。为了恢复一个浪费的元素,这在复杂性上是一个很大的成本。)

看起来代码因循环缓冲区溢出而退出,因为它的结构使得这种情况不会发生,因此它构成了一个内部错误。当一次回合中从生产者向消费者传递的描述符过多时,循环缓冲区会溢出。

通常,循环缓冲区代码不能在缓冲区已满时退出。要么插入操作必须阻止并返回错误,要么必须阻塞以获得更多空间。因此,这是基于示例程序特定假设的特殊情况。

于 2013-08-09T16:24:30.640 回答
0

在您的主线程中:

if( ++iput == MAX_THREAD)
            iput = 0;// so iput is 0 --> MAX_THREAD

在你的子线程中:

 if ( ++iget == MAX_THREAD )
            iget = 0;// So iget is 0 --> MAX_THREAD

由于子线程和主线程“同时”运行,并且它们是全局值。输入可能在某个时候等于 iget。

于 2013-08-04T09:02:25.853 回答
0

摘自“UNIX Network Prgramming Volume 1, 2nd Edition”,第 27.12 章,第 757 页,从注释到第 27-38 行server/serv08.c

我们还检查iput索引没有赶上iget索引,这表明我们的数组不够大。

供参考上述行(取自此处):

27 for ( ; ; ) {
28   clilen = addrlen;
29   connfd = Accept(listenfd, cliaddr, &clilen);   
30   Pthread_mutex_lock(&clifd_mutex);
31   clifd[iput] = connfd;
32   if (++iput == MAXNCLI)
33     iput = 0;
34   if (iput == iget)
35     err_quit("iput = iget = %d", iput);
36   Pthread_cond_signal(&clifd_cond);
37   Pthread_mutex_unlock(&clifd_mutex);
38 }
于 2013-08-09T16:14:03.510 回答