1

我是信号量和互斥概念的新手。我应该使用多线程通过目录递归地在文件中搜索文本。线程数由用户指定。

这段代码的问题是它通过一个目录然后等待。我不知道出了什么问题。我收到分段错误错误。无法弄清楚为什么会这样。

#include <iostream>
#include <sys/wait.h>
#include <sys/types.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <dirent.h>
#include <sys/stat.h>
#include <fstream>
#include <limits.h>
#include <stdlib.h>
#include <semaphore.h>
using namespace std;


#include <stdio.h>

int iDirectories=0;

pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
sem_t semaphore1;
char searchStringThread[PATH_MAX];

int directories=0;


class directoryQueue
{
private:
struct Node
{
    char directoryPath[PATH_MAX];
    Node *next;
};
Node *front;
Node *rear;
Node *nodeCount;

public:

    directoryQueue(void)
    {
        front=NULL;
        rear=NULL;
        nodeCount=0;
    }


    void Enqueue(char array[PATH_MAX])

    {
        Node *newNode;
        newNode=new Node;

        strcpy(newNode->directoryPath,array);
        newNode->next=NULL;
        if(isEmpty())
        {
            front=newNode;
            rear=newNode;
        }
        else
        {
            rear->next=newNode;
            rear=newNode;
        }
        nodeCount++;
    }
    char * Dequeue(void)
    {
        Node *temp;

        if (isEmpty())
            cout << "Error ! Empty Queue "<<endl;
        else
        {
            char *deque;
            deque=new char[PATH_MAX];
            strcpy(deque,front->directoryPath);

            temp = front->next;
            front = temp;
            nodeCount--;
            return deque;
        }

    }

    bool isEmpty(void)
    {
        if(nodeCount)
            return false;
        else
            return true;
    }

    void makeNull(void)
    {
        while(!isEmpty())
        {
            Dequeue();
        }
    }

    ~directoryQueue(void)
    {
        makeNull();
    }
};
directoryQueue saveDirectory;



void *threadHandler(void *)
{
int thpath_length;
char thPath[PATH_MAX];
char saveITDirectory[PATH_MAX];
char itDirectory[PATH_MAX];
int threadCount;
struct dirent *iWalker;
DIR *iDirectory;
pthread_mutex_lock(&mutex);
threadCount=iDirectories++;
pthread_mutex_unlock(&mutex);
sem_wait(&semaphore1);
pthread_mutex_lock(&mutex);
strcpy(itDirectory,saveDirectory.Dequeue());
pthread_mutex_unlock(&mutex);

iDirectory=opendir(itDirectory);
if(iDirectory==NULL)
{
    cout<<"Error"<<endl;
    cout<<itDirectory<<"  Cannot be Opened"<<endl;
    exit(10000);
}
while((iWalker=readdir(iDirectory)) !=NULL)
{
    if(iWalker->d_type==DT_REG)
    {




        strcpy(saveITDirectory,iWalker->d_name);
        cout<<itDirectory<<"/"<<endl;
        if (strcmp (saveITDirectory, "..") == 0 ||
            strcmp (saveITDirectory, ".") == 0) 
        {
            continue;
        }
        else
        {


            thpath_length = snprintf(thPath,PATH_MAX,"%s/%s",itDirectory,saveITDirectory);

            cout<<thPath<<endl;
            if (thpath_length >= PATH_MAX) 
            {
                cout<<"Path is too long"<<endl;
                exit (1000);
            }
            ifstream openFile;
            openFile.open(thPath);
            char line[1500];
            int currentLine = 0;
            if (openFile.is_open()) {
                while (openFile.good()) {
                    currentLine++;
                    openFile.getline(line, 1500);
                    if (strstr(line, searchStringThread) != NULL){
                        cout<<thPath<<": "<<currentLine<<": "<<line<<endl;
                        cout<<"This was performed by Thread no. "<<threadCount<<endl;
                        cout<<"ID :"<<pthread_self();
                    }
                }
            }
            openFile.close();    
        }
    }
    if (closedir (iDirectory)) 
    {
        cout<<"Unable to close  "<<itDirectory<<endl;
        exit (1000);
    }
}

}



void walkThroughDirectory(char directory_name[PATH_MAX],char  searchString[PATH_MAX])
{

DIR * directory;
struct dirent * walker;
char d_name[PATH_MAX];
int path_length;
char path[PATH_MAX];
directory=opendir(directory_name);
if(directory==NULL)
{
    cout<<"Error"<<endl;
    cout<<directory_name<<"  Cannot be Opened"<<endl;
    exit(10000);
}
while((walker=readdir(directory)) !=NULL)
{




    strcpy(d_name,walker->d_name);
    cout<<directory_name<<"/"<<endl;
    if (strcmp (d_name, "..") == 0 ||
        strcmp (d_name, ".") == 0) 
    {
        continue;
    }
    else
    {


        path_length = snprintf(path,PATH_MAX,"%s/%s",directory_name,d_name);

        cout<<path<<endl;
        if (path_length >= PATH_MAX) 
        {
            cout<<"Path is too long"<<endl;
            exit (1000);
        }
        if(walker->d_type==DT_DIR)
        {
            pthread_mutex_lock(&mutex);
            saveDirectory.Enqueue(path);
            pthread_mutex_lock(&mutex);
            sem_post(&semaphore1);
            directories++;
            walkThroughDirectory (path,searchString);
        }
        else if(walker->d_type==DT_REG)
        {   
            ifstream openFile;
            openFile.open(path);
            char line[1500];
            int currentLine = 0;
            if (openFile.is_open()) {
                while (openFile.good()) {
                    currentLine++;
                    openFile.getline(line, 1500);
                    if (strstr(line, searchString) != NULL)
                        cout<<path<<": "<<currentLine<<": "<<line<<endl;
                }
            }
            openFile.close();    
        }


    }

}

if (closedir (directory)) 
{
    cout<<"Unable to close  "<<directory_name<<endl;
    exit (1000);
}
}













int main(int argc,char *argv[])
{
char * name;

cout<<"Total Directories  "<< directories<<endl;



name=get_current_dir_name();
cout<<"Current Directory is:  "<<name<<endl;
sem_init(&semaphore1,0,0);
strcpy(searchStringThread,argv[1]);
int number_of_threads=atoi(argv[3]);
pthread_t threads[number_of_threads];




walkThroughDirectory(argv[2],argv[1]);
pthread_mutex_lock(&mutex);
saveDirectory.Enqueue(argv[2]);
pthread_mutex_unlock(&mutex);
sem_post(&semaphore1);

for(int i=0;i<number_of_threads;i++)
{
    pthread_create(&threads[i],NULL,threadHandler,NULL);
}
for(int j=0;j<number_of_threads;j++)
{
    pthread_join(threads[j],NULL);
}
while(saveDirectory.isEmpty())
{
    cout<<"Queue is Empty"<<endl;
    cout<<"Exiting"<<endl;
    exit(10000);
}
free(name);
cout<<"Total Directories  "<< directories<<endl;


return 0;
}
4

1 回答 1

2

有一个简单的错误,你锁定一个互斥锁两次而不是在你完成后解锁它:

        pthread_mutex_lock(&mutex);
        saveDirectory.Enqueue(path);
        pthread_mutex_lock(&mutex);

应该:

        pthread_mutex_lock(&mutex);
        saveDirectory.Enqueue(path);
        pthread_mutex_unlock(&mutex);

注意:这并不是说没有其他问题 - 只是这可能是您的直接问题。

最大的问题是,看起来您将目录放在saveDirectory队列中(因此另一个线程可以将其拉出以处理它),然后在刚刚将其放入队列的线程中递归地继续处理该目录。我认为您需要更多地考虑如何在线程之间分配工作。

还有一些小评论:

  • std::string如果允许,您可能需要考虑使用。它应该使您的一些字符串处理更简单( directoryQueue::Dequeue()例如,您从从返回的数据中泄漏内存)
  • 如果类存在的主要原因 directoryQueue是为多个线程保存工作项,那么也许它应该管理它自己的互斥体,这样调用者就不需要处理这种复杂性
于 2012-04-25T19:11:30.060 回答