2

我在实现具有主线程调用accept()然后将新客户端套接字传递给从线程的服务器时遇到问题(我之前创建了一个固定大小的线程池)。

它的实现方式是,我使用一个简单的整数数组来表示一堆客户端套接字,这些套接字会被成功的accept()调用填满。每个从属线程有一个互斥锁,如果每个从属线程的当前客户端文件描述符设置为-1(它在线程代码中设置为这样,在满足请求结束时),则每个从属线程都是隐式空闲的。

在一个用例中,我有一个 shell 脚本启动给定数量的客户端,例如,六个并发客户端(六个进程)。检查 TCP 连接设置,所有客户端都没有收到任何错误connect()

现在问题来了。在这个有六个客户端的特定测试场景中,服务器只打印了预期六条成功消息中的五条accept()。我不确定,但感觉就像要接受的客户端连接之一只是丢失了!> Server got connection from ...

更奇怪的是,我在接收接受的套接字和将其分派到从线程之间添加了一个continue( )。//MAGICAL TEST有了这个continue,没有从站接收工作,但现在所有客户端连接都按预期打印。我不太确定,但是当主线程向从属线程分派工作时,接受队列中的一个客户端连接是否有可能被丢弃?

编辑:只有当我的从属线程数量低于同时请求的数量时,才会发生这种情况。不过,我不明白为什么在这种情况下主线程似乎没有打印额外的连接之一(例如,2 个从属与 6 个并发请求)。

线程局部数据定义如下:

#define PENDING_CONNS 35
typedef struct _ThreadData {
    pthread_t m_Thread;
    pthread_mutex_t m_Mutex;
    char* m_Task;
    int m_ClientFD;
    unsigned int m_Index;
    unsigned short int m_Networking;
    char* m_OutputDir;
} ThreadData;
static ThreadData *g_Workers = NULL;
static unsigned int g_NumWorkers = 0;
static pthread_barrier_t g_StartBarrier;

然后是主线程部分:

/*
 * Executed by the coordinating thread. Awaits and passes new client connections to free working threads.
 */
int RunServer(char* port) {
    // Listen on sock_fd, new connection on new_fd.
    int sockfd, new_fd;
    struct addrinfo hints, *servinfo, *p;

    // Connector's address information.
    struct timeval timeout;
    struct sockaddr_storage their_addr;
    socklen_t sin_size;

    //  struct sigaction sa; //TODO: possible signal handling later.
    int yes = 1;
    char s[INET6_ADDRSTRLEN];
    int rv;

    memset(&hints, 0, sizeof hints);
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE; // use my IP

    if ((rv = getaddrinfo(NULL, port, &hints, &servinfo)) != 0) {
        fprintf(stderr, "> Server: getaddrinfo: %s\n", gai_strerror(rv));
        return -1;
    }

    // Loop through all the results and bind to the first we can
    for (p = servinfo; p != NULL; p = p->ai_next) {
        if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
            perror("> Server: socket");
            continue;
        }

        if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
            perror("> Server: setsockopt");
            return -1;
        }

        if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
            close(sockfd);
            perror("> Server: bind");
            continue;
        }
        break;
    }

    if (p == NULL) {
        fprintf(stderr, "> Server: failed to bind\n");
        return -1;
    }

    // All done with this structure.
    freeaddrinfo(servinfo);
    //PENDING_CONNS defined as 25
    if (listen(sockfd, PENDING_CONNS) == -1) {
        perror("> Server: listen");
        return -1;
    }

    printf("> Server: waiting for connections...\n");

    fd_set_blocking(sockfd, 0); // NON BLOCKING
    int requestStack[1000];
    int stackTop = 0;

    // Main loop of coordinating thread.
    while (1) {
        sin_size = sizeof their_addr;
        new_fd = accept(sockfd, (struct sockaddr *) &their_addr, &sin_size);
        if (new_fd == -1) {
            //queue was probably empty...
        }
        else {
            inet_ntop(their_addr.ss_family,
                GetInAddress((struct sockaddr *) &their_addr), s, sizeof s);
            printf("> Server: got connection from %s\n", s);

            requestStack[stackTop++] = new_fd;
            printf("> Server: stack top is now %i with value %i.\n", stackTop, requestStack[stackTop-1]);
        }

        // MAGICAL TEST: continue;

        // Linear search for a free worker thread...
        unsigned int i;
        int rc;
        for (i = 0; i < g_NumWorkers && stackTop > 0; i++) {
            rc = pthread_mutex_trylock(&(g_Workers[i].m_Mutex));
            if (rc == EBUSY)  // If it was already locked, skip worker thread.
                continue;
            else if (!rc) { // If we managed to lock the thread's mutex...
                if (g_Workers[i].m_ClientFD == -1) { // Check if it is currently out of work.
                    g_Workers[i].m_ClientFD = requestStack[--stackTop];
                    printf("> Server: master thread assigned fd %i to thread %u\n", requestStack[stackTop], i);
                }
                rc = pthread_mutex_unlock(&(g_Workers[i].m_Mutex));
                if (rc) {
                    printf("> Server: main thread mutex unlock failed on thread %i with error %i!\n", i, rc);
                    return -1;
                }
            }
            else
                printf("> Server: main thread try-lock failed on thread %i with error %i!\n", i, rc);
        }
    }
    return 0;
}

对于代码的冗长以及可能错过手册页中的任何基本信息,我深表歉意。

根据要求,这里是从线程函数:

static void *WorkRoutine(void *args) {
    ThreadData * const threadData = (ThreadData*) args;
    pthread_mutex_t * const tMutex = &(threadData->m_Mutex);
    const unsigned int tIndex = threadData->m_Index;
    int clientFD = threadData->m_ClientFD;

    // Each thread initializes its own mutex.
    int rc = pthread_mutex_init(tMutex, NULL);
    if (!rc)
        printf("> Slave %u: mutex %p initialization was sucessful!\n", tIndex, tMutex);
    if (rc && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
        printf("> Slave %u: failed mutex initialization!\n", tIndex);
        perror("pthread_mutex_init");
        exit(-1); //TODO: implement graceful error exit.
    }

    // Synchronization point: all threads including the master thread wait for all slave mutexes to be initialized.
    rc = pthread_barrier_wait(&g_StartBarrier);
    if (rc && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
        printf("> Slave %u: failed initial barrier synchronization!\n", tIndex);
        perror("pthread_barrier_wait");
        exit(-1); //TODO: implement graceful error exit.
    }

    while (1) { //TODO replace 1 with state var that is turned off when the master thread receives a request to shutdown server.
        rc = pthread_mutex_lock(tMutex);
        if (rc) {
            printf("> Slave %u: mutex lock returned an error: %i!\n", tIndex, rc);
            perror("pthread_mutex_lock()");
            continue;
        }
        clientFD = threadData->m_ClientFD;
        if (!threadData->m_Task && clientFD == -1) {
            rc = pthread_mutex_unlock(tMutex);
            int yieldStatus = sched_yield();
            if (yieldStatus) {
                printf("> Slave %u: error while attempting to yield!\n", tIndex);
                perror("sched_yield()\n");
            }
            continue;
            //printf("> Slave %u: yielding.\n", tIndex);
        }
        else if (!threadData->m_Task && clientFD != -1) {
            // Read client request.
            threadData->m_Task = (char*) calloc(FILE_BUFFER, sizeof(char)); //TODO: stop allocating buffer on every task.
            printf("> Slave %u going to read from %i into %p\n", tIndex, clientFD, threadData->m_Task);

            MessageHeader h;
            GetHeader(&h, clientFD);
            if (h.m_ID != BeginSession) {
                //protocol implementation error
                exit(-1);
            }
            printf("> Slave %u: expecting client command of length %i\n", tIndex, h.m_ContentSize);
            int n = GetContent((void*) threadData->m_Task, h.m_ContentSize * sizeof(char), clientFD);
            printf("> Slave %u: client of file descriptor %i sent [%s] at a total of %i bytes\n", tIndex, clientFD, threadData->m_Task, n);
            short int remoteOperation;

            int baseArgvSize = 10;
            char **argv = (char**) calloc(baseArgvSize, sizeof(char*)); //TODO: stop allocating table on every task.
            int argc = CmdToTable(threadData->m_Task, &argv, &baseArgvSize);

            int localLen = strlen("local");
            int remoteLen = strlen("remote");

            if(!strncmp(threadData->m_Task, "remote", remoteLen)) {
                remoteOperation = 1;
                g_Workers[tIndex].m_Networking = 1;

            }
            else if(!strncmp(threadData->m_Task, "local", localLen)) {
                remoteOperation = 0;
                g_Workers[tIndex].m_Networking = 0;
                g_Workers[tIndex].m_OutputDir = (char*) calloc(FILE_BUFFER, sizeof(char));

                MessageHeader hPath;
                GetHeader(&hPath, clientFD);
                if (hPath.m_ID != SendMessage) {
                    //protocol implementation error
                    exit(-1);
                }

                int cnt = GetContent((void*) g_Workers[tIndex].m_OutputDir, hPath.m_ContentSize * sizeof(char), clientFD);
                g_Workers[tIndex].m_OutputDir[hPath.m_ContentSize * sizeof(char)] = '/';
                printf("> Slave %u: received message with %i bytes.\n", tIndex, cnt);
                printf("> Slave %u: client output going to %s\n", tIndex, g_Workers[tIndex].m_OutputDir);
            }
            else {
                printf("> Slave %u: received bogus client command: %s\n", tIndex, threadData->m_Task);
                return NULL;
            }
            printf("> Slave %u: remote mode - %i.\n", tIndex, g_Workers[tIndex].m_Networking);
            // Debug print the table built from the client command.
            printf("> Slave %u: table has %i entries.\n", tIndex, argc);
            int i;
            for (i = 0; i < argc; i++) {
                printf("> Slave %u: argument %i is %s.\n", tIndex, i, argv[i]);
            }
            // Prepare the input files the client will send.
            char**filenames = (char**) calloc(argc, sizeof(char*));
            FILE**files = (FILE**) calloc(argc, sizeof(FILE*));
            unsigned int reqIDIndex = 0;
            unsigned int reqIDLen = strlen(argv[reqIDIndex]);

            if(!remoteOperation)
                continue;

            // Check which files need to be sent from the client.
            for (i = reqIDIndex + 1; i < argc; i++) {
                unsigned int argSize = strlen(argv[i]);
                if (strstr(argv[i], FILETYPE_SEPARATOR) != NULL
                        && strstr(argv[i], INDEX_FILETYPE) == NULL) {
                    // If the current argument has a file type and isn't an index type.
                    filenames[i] = calloc(reqIDLen + argSize + 2, sizeof(char)); // +2 to account for terminator char and hyphen between reqID and filename

                    printf("> Slave %u: arg %s check with req %s\n", tIndex, argv[i], argv[reqIDIndex]);
                    int noReqYet = strncmp(argv[i], argv[reqIDIndex], reqIDLen);
                    if (noReqYet) {
                        printf("> Slave %u: no prepended request yet!\n", tIndex);
                        strncpy(filenames[i], argv[reqIDIndex], reqIDLen);
                        filenames[i][reqIDLen] = '-';
                    }
                    // If the argument is a path to a file, get the filename and discard the path (that was local to the client).
                    char*nameStart = strrchr(argv[i], '/'); //TODO: make a define for 92: backslash ascii char number
                    printf("> Slave %u: going to remove backslash from %s\n", tIndex, argv[i]);

                    if (nameStart != NULL )
                        nameStart++;
                    else
                        nameStart = argv[i];

                    printf("> Slave %u: got %s after removing backslash\n", tIndex, nameStart);
                    printf("noReqYet: %i\n", noReqYet);
                    if (!noReqYet)
                        strncpy(filenames[i], nameStart, strlen(nameStart));
                    else
                        strncpy(filenames[i] + reqIDLen + 1, nameStart,
                                strlen(nameStart)); //+1 to account for the hyphen.
                    // When freeing the argv table, only need to free(argv). There is no need to free argv[i] elements.
                    argv[i] = filenames[i];
                }
                else if (strstr(argv[i], FILETYPE_SEPARATOR) != NULL && strstr(argv[i], INDEX_FILETYPE) != NULL ) {
                    // If the current argument is the index type.
                    char*nameStart = strrchr(argv[i], '/');
                    if (nameStart != NULL ) {
                        nameStart++;
                        argv[i] = nameStart;
                    }
                }
            }

            // Debug print the needed files.
            printf("> Slave %u: awaiting files:", tIndex);
            for (i = 0; i < argc; i++) {
                if (filenames[i] != NULL ) {
                    printf(" %s", filenames[i]);
                }
            }
            printf("\n");

            // Await each input file from the client.
            for (i = 0; i < argc; i++) {
                if (filenames[i] != NULL ) {
                    files[i] = GetFile(clientFD, filenames[i]); // Note: GetFile invokes fclose!
                    if (files[i])
                        printf("> Slave %u: locally stored file %s\n", tIndex, filenames[i]);
                }
            }
        }
        printf("> Slave %u: going to invoke SatisfyRequest with arguments: ", tIndex);
        for (i = reqIDIndex; i < argc; i++) {
            if (argv[i] != NULL )
                printf("%s ", argv[i]);
        }
        printf("\n");

        // Process the request against the index.
        int res = SatisfyRequest(argc, argv, tIndex, clientFD);

        char* dummy;
        if (!res)
            dummy = "Your request succeeded.";
        else
            dummy = "Sorry, there was an error.";

        // Terminate session with the client.
        SendMsg(EndSession, (void*)dummy, strlen(dummy), clientFD);
        printf("> Slave %u: task result delivered to client of file descriptor %i.\n", tIndex, clientFD);
        close(clientFD);
        free(threadData->m_Task);
        free(argv);
        if(!remoteOperation) {
            free(g_Workers[tIndex].m_OutputDir);
        }
        // Free received file names and file pointer array.
        for (i = 0; i < argc; i++) {
            if (filenames[i])
                free(filenames[i]);
        }
        free(filenames);
        free(files);

        // Reset thread-local data.
        threadData->m_Task = NULL;
        threadData->m_ClientFD = -1;
        clientFD = -1;

        printf("> Slave %u: task finished.\n", tIndex);
        rc = pthread_mutex_unlock(tMutex);
        if(!rc)
            continue;
    }
    if (rc) {
        printf("> Slave %u: mutex unlock returned an error: %i!\n", tIndex, rc);
        perror("pthread_mutex_unlock()");
    }
    return NULL;
}

这是代码的客户端部分:

char*cmdBuffer = (char*)calloc(FILE_BUFFER, sizeof(char));
char*cmdRealPath = (char*)calloc(FILE_BUFFER, sizeof(char));
int auxNameTableSz = 25;
char**auxNameTable = (char**)calloc(auxNameTableSz, sizeof(char*));

char* ip = argv[2];
char* port = argv[3];
char* networkMode = argv[4];
//  char* reqID = argv[5];
const int cmdStartIndex = 6;
short int remoteOperation;

if(!strcmp(networkMode, "remote")) {
    remoteOperation = 1;
    auxNameTableSz = 0;
}
else if(!strcmp(networkMode, "local")) {
    // If the client is on the server's machine, need to convert argument files to absolute paths.
    remoteOperation = 0;
    int i = cmdStartIndex;
    int j = 0;
    while(i < argc) {
        if(strrchr(argv[i], '/') || strrchr(argv[i], '.')) {
            argv[i] = realpath(argv[i], NULL);
            auxNameTable[j++] = argv[i];
        }
        i++;
    }
    auxNameTableSz = j;
}
else {
    printf("> Client: argument four must be either \"global\" or \"local\"!");
    free(cmdBuffer);
    free(cmdRealPath);
    FreePathsTable(&auxNameTable, auxNameTableSz);
    DeallocateGlobals();
    return 0;
}

// Concatenate client arguments into a single string.
MakeCommand(argc, argv, cmdBuffer);
printf("> Client: [Command]: %s\n", cmdBuffer);

// Establish a (TCP) connection with the server.
if(PrepareConnection(ip, port)) {
    // Failed to prepare connection.
    free(cmdBuffer);
    free(cmdRealPath);
    FreePathsTable(&auxNameTable, auxNameTableSz);
  DeallocateGlobals();
    return 0;
}
if(ConnectToServer()) {
    // Failed to establish a stream socket connection.
    free(cmdBuffer);
    free(cmdRealPath);
    FreePathsTable(&auxNameTable, auxNameTableSz);
    DeallocateGlobals();
    return 0;
}

// Send the client command to the server.
printf("> Client: sending command of %i bytes to server: [%s].\n", (int)strlen(cmdBuffer), cmdBuffer);
int count=0;
if((count=WriteToServer(BeginSession, cmdBuffer)) == -1) {
    // Command send failed.
    free(cmdBuffer);
    free(cmdRealPath);
    FreePathsTable(&auxNameTable, auxNameTableSz);
    DeallocateGlobals();
    return 0;
}
printf("> Client sent command to the server at %i bytes.\n", count);
if(remoteOperation) {
        // Send the correct input files to the server.
        SendInputFiles(argc, argv, cmdStartIndex);
}
else {
    // Send the real path of the client's working directory to the server.
    char *localDir = realpath(".", NULL);
    count=WriteToServer(SendMessage, localDir);
printf("> Client sent the real path to the server at %i bytes.\n", count);
    free(localDir);
}

// Await reply from the server.
char *buffer = (char*) calloc(FILE_BUFFER, sizeof(char));
count = ReceiveFromServer(buffer);

char *backPtr = buffer;

if(remoteOperation) {
    printf("> Client: got output list [%s] totalling %i bytes.\n", buffer, count);
    // Receive the files produced by the server if the client and server are not on the same machine.
    ReceiveOutputFiles(buffer);

    // Await session termination.
        buffer = backPtr;
        memset(buffer, 0, sizeof(char)*FILE_BUFFER);
        count = ReceiveFromServer(buffer);

}
else {

}

printf("> Client: server response: [%s]-[%s]\n", buffer, cmdBuffer);
4

1 回答 1

1

发布这个答案已经过期了,但就是这样。

在函数中使用局部clientFD变量WorkRoutine会干扰线程同步的逻辑......我暂时使用该变量是为了便于阅读,但我忽略了它可能会干扰线程的正确操作的事实。我将其替换为直接使用线程结构 FD 变量,现在它正在工作 ( threadData->m_ClientFD)。

我在下面展示最终代码:

static void *WorkRoutine(void *args) {
// Easier access of thread-local data.
ThreadData * const threadData = (ThreadData*) args;
pthread_mutex_t * const tMutex = &(threadData->m_Mutex);
const unsigned int tIndex = threadData->m_Index;

threadData->m_StatIndex = 0;
threadData->m_StatCount = BASE_TASK_COUNT;
threadData->m_OutputDir = NULL;
threadData->errorFileName = (char*) calloc(FILE_BUFFER, sizeof(char));

char *logName = (char*) calloc(FILE_BUFFER, sizeof(char));
const char *baseName = "./log-t";
size_t baseNameSize = strlen(baseName);
memcpy(logName, baseName, baseNameSize);
sprintf(logName + baseNameSize, "%u", threadData->m_Index);
threadData->m_Log = stdout;
#if defined LOGGING
threadData->m_Log = fopen(logName, "w");
#endif


fprintf(threadData->m_Log, "> Slave %u: log file %s at pointer %p.\n", tIndex, logName, threadData->m_Log);

// Each thread initializes its own mutex.
int rc = pthread_mutex_init(tMutex, NULL);
if (!rc)
    fprintf(threadData->m_Log, "> Slave %u: mutex %p initialization was successful!\n", tIndex, tMutex);
if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
    fprintf(threadData->m_Log, "> Slave %u: failed mutex initialization!\n", tIndex);
    perror("pthread_mutex_init");
    exit(-1);
}

// Each thread will also need to find out its own clock id.
int clkStatus = pthread_getcpuclockid(threadData->m_Thread, &(threadData->m_CID));
if (clkStatus != 0)
    handle_error_en(clkStatus, "pthread_getcpuclockid");


// Synchronization point: all threads including the master thread wait for all slave mutexes to be initialized.
rc = pthread_barrier_wait(&g_StartBarrier);
if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
    fprintf(threadData->m_Log, "> Slave %u: failed initial barrier synchronization!\n", tIndex);
    perror("pthread_barrier_wait");
    exit(-1); 
}

while (1) { 
    threadData->m_OutputDir = NULL;

    // Check if server was ordered to shutdown.
    rc = pthread_mutex_lock(&g_ShutdownLock);
    if(!rc) {
        if(g_ShutdownOrdered) {
            pthread_mutex_unlock(&g_ShutdownLock);
            break;
        }
        else {
            pthread_mutex_unlock(&g_ShutdownLock);
        }
    }
    else {
        fprintf(threadData->m_Log, "> Slave %u: failed on mutex lock of shutdown lock: %i\n", tIndex, rc);
        perror("slave: pthread_mutex_lock");
    }


    rc = pthread_mutex_lock(tMutex);
    if (!rc) {
        if (!threadData->m_Task && threadData->m_ClientFD == -1) {
            rc = pthread_mutex_unlock(tMutex);
            int yieldStatus = sched_yield();
            if (yieldStatus) {
                fprintf(threadData->m_Log, "> Slave %u: error while attempting to yield!\n", tIndex);
                perror("sched_yield()\n");
            }
            continue;
            //printf("> Slave %u: yielding.\n", tIndex);
        }
        else if (!threadData->m_Task && threadData->m_ClientFD != -1) {
            // Read client request.
            threadData->m_Task = (char*) calloc(FILE_BUFFER, sizeof(char)); 
            memset(threadData->m_Task, 0, FILE_BUFFER * sizeof(char));
            fprintf(threadData->m_Log, "> Slave %u: going to read from %i into %p\n", tIndex, threadData->m_ClientFD, threadData->m_Task);

            MessageHeader h;
            GetHeader(&h, threadData->m_ClientFD);


            // In case we received an order to kill the server.
            if(h.m_ID == CloseServer) {
                fprintf(threadData->m_Log, "> Slave %u: got order to close server!\n", tIndex);
                rc = pthread_mutex_lock(&g_ShutdownLock);
                if(!rc) {
                    g_ShutdownOrdered = 1;
                    free(threadData->m_Task);
                    pthread_mutex_unlock(&g_ShutdownLock);
                    // Reset thread-local data.
                    threadData->m_Task = NULL;
                    close(threadData->m_ClientFD);
                    threadData->m_ClientFD = -1;
                    break;
                }
                else {
                    fprintf(threadData->m_Log, "> Slave %u: failed locking of shutdown lock: %i", tIndex, rc);
                    perror("pthread_mutex_lock");
                }
            }

            if (h.m_ID != BeginSession) {
                //protocol implementation error
                exit(-1);
            }


            clock_gettime(threadData->m_CID, &(threadData->m_PrevTime));
            fprintf(threadData->m_Log, "> Slave %u: expecting client command of length %i\n", tIndex, h.m_ContentSize);
            int n = GetContent((void*) threadData->m_Task, h.m_ContentSize * sizeof(char), threadData->m_ClientFD);
            fprintf(threadData->m_Log, "> Slave %u: client of file descriptor %i sent [%s] at a total of %i bytes\n", tIndex, threadData->m_ClientFD, threadData->m_Task, n);

            int baseArgvSize = 10;
            char **argv = (char**) calloc(baseArgvSize, sizeof(char*)); 
            int argc = CmdToTable(threadData->m_Task, &argv, &baseArgvSize, threadData->m_Log);

            int localLen = strlen("local");
            int remoteLen = strlen("remote");

            if(!strncmp(threadData->m_Task, "remote", remoteLen)) {
                g_Workers[tIndex].m_Networking = 1;
            }
            else if(!strncmp(threadData->m_Task, "local", localLen)) {
                g_Workers[tIndex].m_Networking = 0;
                g_Workers[tIndex].m_OutputDir = (char*) calloc(FILE_BUFFER, sizeof(char));

                MessageHeader hPath;
                GetHeader(&hPath, threadData->m_ClientFD);
                if (hPath.m_ID != SendMessage) {
                    //protocol implementation error
                    exit(-1);
                }


                int cnt = GetContent((void*) threadData->m_OutputDir, hPath.m_ContentSize * sizeof(char), threadData->m_ClientFD);
                printf("> Slave %u: I RECEIVED %s AS OUTPUT DIRECTORY!\n\n\n\n\n", tIndex, threadData->m_OutputDir);
                threadData->m_OutputDir[hPath.m_ContentSize * sizeof(char)] = '/';



                fprintf(threadData->m_Log, "> Slave %u: received output directory message with %i bytes.\n", tIndex, cnt);
                fprintf(threadData->m_Log, "> Slave %u: client output going to %s\n", tIndex, threadData->m_OutputDir);
            }
            else {
                fprintf(threadData->m_Log, "> Slave %u: received bogus client command: %s\n", tIndex, threadData->m_Task);
                return NULL;
            }
            fprintf(threadData->m_Log, "> Slave %u: remote mode - %i.\n", tIndex, g_Workers[tIndex].m_Networking);
            // Debug print the table built from the client command.
            fprintf(threadData->m_Log, "> Slave %u: table has %i entries.\n", tIndex, argc);
            int i;
            for (i = 0; i < argc; i++) {
                fprintf(threadData->m_Log, "> Slave %u: argument %i is %s.\n", tIndex, i, argv[i]);
            }
            // Prepare the input files the client will send.
            char**filenames = (char**) calloc(argc, sizeof(char*));
            FILE**files = (FILE**) calloc(argc, sizeof(FILE*));
            unsigned int reqIDIndex = 0;
            unsigned int reqIDLen = strlen(argv[reqIDIndex]);
            threadData->m_REQ = argv[reqIDIndex];
            if(threadData->m_Networking) {
                // Check which files need to be sent from the client.
                for (i = reqIDIndex + 1; i < argc; i++) {
                    unsigned int argSize = strlen(argv[i]);
                    if (strstr(argv[i], FILETYPE_SEPARATOR) != NULL
                            && strstr(argv[i], INDEX_FILETYPE) == NULL) {
                        // If the current argument has a file type and isn't an index type.
                        filenames[i] = calloc(reqIDLen + argSize + 2, sizeof(char)); // +2 to account for terminator char and hyphen between reqID and filename

                        fprintf(threadData->m_Log, "> Slave %u: arg %s check with req %s\n", tIndex, argv[i], argv[reqIDIndex]);
                        int noReqYet = strncmp(argv[i], argv[reqIDIndex], reqIDLen);
                        if (noReqYet) {
                            fprintf(threadData->m_Log, "> Slave %u: no prepended request yet!\n", tIndex);
                            strncpy(filenames[i], argv[reqIDIndex], reqIDLen);
                            filenames[i][reqIDLen] = '-';
                        }
                        // If the argument is a path to a file, get the filename and discard the path (that was local to the client).
                        char*nameStart = strrchr(argv[i], '/'); 
                        fprintf(threadData->m_Log, "> Slave %u: going to remove backslash from %s\n", tIndex, argv[i]);

                        if (nameStart != NULL )
                            nameStart++;
                        else
                            nameStart = argv[i];

                        fprintf(threadData->m_Log, "> Slave %u: got %s after removing backslash\n", tIndex, nameStart);
                        fprintf(threadData->m_Log, "noReqYet: %i\n", noReqYet);
                        if (!noReqYet)
                            strncpy(filenames[i], nameStart, strlen(nameStart));
                        else
                            strncpy(filenames[i] + reqIDLen + 1, nameStart,
                                    strlen(nameStart)); //+1 to account for the hyphen.
                        // When freeing the argv table, only need to free(argv). There is no need to free argv[i] elements.
                        argv[i] = filenames[i];
                    } else if (strstr(argv[i], FILETYPE_SEPARATOR) != NULL
                            && strstr(argv[i], INDEX_FILETYPE) != NULL ) {
                        // If the current argument is the index type.
                        char*nameStart = strrchr(argv[i], '/');
                        if (nameStart != NULL ) {
                            nameStart++;
                            argv[i] = nameStart;
                        }
                    }
                }

                // Debug print the needed files.
                fprintf(threadData->m_Log, "> Slave %u: awaiting files:", tIndex);
                for (i = 0; i < argc; i++) {
                    if (filenames[i] != NULL ) {
                        fprintf(threadData->m_Log, " %s", filenames[i]);
                    }
                }
                fprintf(threadData->m_Log, "\n");

                // Await each input file from the client.
                for (i = 0; i < argc; i++) {
                    if (filenames[i] != NULL ) {
                        files[i] = GetFile(threadData->m_ClientFD, filenames[i]); // Note: GetFile invokes fclose!
                        if (files[i])
                            fprintf(threadData->m_Log, "> Slave %u: locally stored file %s\n", tIndex, filenames[i]);
                    }
                }
            }
            fprintf(threadData->m_Log, "> Slave %u: going to invoke SatisfyRequest with arguments: ", tIndex);
            for (i = reqIDIndex; i < argc; i++) {
                if (argv[i] != NULL )
                    fprintf(threadData->m_Log, "%s ", argv[i]);
            }
            fprintf(threadData->m_Log, "\n");

            // Process the request against the index.
            int res = SatisfyRequest(argc, argv, tIndex, threadData->m_ClientFD);
            char success = 0;
            if(!res)
                success = 1;

            // Terminate session with the client.
            SendMsg(EndSession, (void*)&success, sizeof(success), threadData->m_ClientFD);
            fprintf(threadData->m_Log, "> Slave %u: task result delivered to client of file descriptor %i.\n", tIndex, threadData->m_ClientFD);
            close(threadData->m_ClientFD);

            free(argv);
            threadData->m_REQ = NULL;
            if(!threadData->m_Networking) {
                free(threadData->m_OutputDir);
                threadData->m_OutputDir = NULL;
            }

            // Free received file names and file pointer array.
            for (i = 0; i < argc; i++) {
                if (filenames[i]) {
                    free(filenames[i]);
                    filenames[i] = NULL;
                }
            }
            free(filenames);
            free(files);
            filenames = NULL;
            files = NULL;
            // Print out the time it took for the task.
            struct timespec tempTime;
            clock_gettime(threadData->m_CID, &tempTime);

            struct timespec timeDelta;
            timeDelta.tv_nsec = tempTime.tv_nsec - threadData->m_PrevTime.tv_nsec;
            timeDelta.tv_sec = tempTime.tv_sec - threadData->m_PrevTime.tv_sec;

            if(timeDelta.tv_nsec < 0)
                timeDelta.tv_nsec *= -1;
            if(timeDelta.tv_sec < 0)
                timeDelta.tv_sec *= -1;

            RegisterStat(tIndex, threadData->m_StatIndex++, timeDelta, threadData->m_Task);

            fprintf(threadData->m_Log, "Slave %u: %4ld.%03ld\n", tIndex, timeDelta.tv_sec,timeDelta.tv_nsec / 1000000);
            memcpy(&(threadData->m_PrevTime), &tempTime, sizeof(struct timespec));

            // Reset thread-local data.
            memset(threadData->m_Task, 0, FILE_BUFFER * sizeof(char));
            free(threadData->m_Task);
            threadData->m_Task = NULL;
            threadData->m_ClientFD = -1;
            fprintf(threadData->m_Log, "> Slave %u: task finished.\n", tIndex);
            rc = pthread_mutex_unlock(tMutex);
        }
        if (rc) {
            fprintf(threadData->m_Log, "> Slave %u: mutex unlock returned an error: %i!\n", tIndex, rc);
            perror("pthread_mutex_unlock()");
        }
    } else {
        fprintf(threadData->m_Log, "> Slave %u: mutex lock returned an error: %i!\n", tIndex, rc);
        perror("pthread_mutex_lock()");
    }
}
free(threadData->errorFileName);
threadData->errorFileName = NULL;
free(logName);
logName = NULL;
fprintf(threadData->m_Log, "> Slave %u: exiting...\n", tIndex);

#if defined LOGGING
    fclose(threadData->m_Log);
#endif
threadData->m_Log = NULL;

return 0;
}
于 2014-02-25T00:15:37.633 回答