我在实现具有主线程调用accept()
然后将新客户端套接字传递给从线程的服务器时遇到问题(我之前创建了一个固定大小的线程池)。
它的实现方式是,我使用一个简单的整数数组来表示一堆客户端套接字,这些套接字会被成功的accept()
调用填满。每个从属线程有一个互斥锁,如果每个从属线程的当前客户端文件描述符设置为-1
(它在线程代码中设置为这样,在满足请求结束时),则每个从属线程都是隐式空闲的。
在一个用例中,我有一个 shell 脚本启动给定数量的客户端,例如,六个并发客户端(六个进程)。检查 TCP 连接设置,所有客户端都没有收到任何错误connect()
。
现在问题来了。在这个有六个客户端的特定测试场景中,服务器只打印了预期六条成功消息中的五条accept()
。我不确定,但感觉就像要接受的客户端连接之一只是丢失了!> Server got connection from ...
更奇怪的是,我在接收接受的套接字和将其分派到从线程之间添加了一个continue
( )。//MAGICAL TEST
有了这个continue
,没有从站接收工作,但现在所有客户端连接都按预期打印。我不太确定,但是当主线程向从属线程分派工作时,接受队列中的一个客户端连接是否有可能被丢弃?
编辑:只有当我的从属线程数量低于同时请求的数量时,才会发生这种情况。不过,我不明白为什么在这种情况下主线程似乎没有打印额外的连接之一(例如,2 个从属与 6 个并发请求)。
线程局部数据定义如下:
#define PENDING_CONNS 35
typedef struct _ThreadData {
pthread_t m_Thread;
pthread_mutex_t m_Mutex;
char* m_Task;
int m_ClientFD;
unsigned int m_Index;
unsigned short int m_Networking;
char* m_OutputDir;
} ThreadData;
static ThreadData *g_Workers = NULL;
static unsigned int g_NumWorkers = 0;
static pthread_barrier_t g_StartBarrier;
然后是主线程部分:
/*
* Executed by the coordinating thread. Awaits and passes new client connections to free working threads.
*/
int RunServer(char* port) {
// Listen on sock_fd, new connection on new_fd.
int sockfd, new_fd;
struct addrinfo hints, *servinfo, *p;
// Connector's address information.
struct timeval timeout;
struct sockaddr_storage their_addr;
socklen_t sin_size;
// struct sigaction sa; //TODO: possible signal handling later.
int yes = 1;
char s[INET6_ADDRSTRLEN];
int rv;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; // use my IP
if ((rv = getaddrinfo(NULL, port, &hints, &servinfo)) != 0) {
fprintf(stderr, "> Server: getaddrinfo: %s\n", gai_strerror(rv));
return -1;
}
// Loop through all the results and bind to the first we can
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) {
perror("> Server: socket");
continue;
}
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
perror("> Server: setsockopt");
return -1;
}
if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
close(sockfd);
perror("> Server: bind");
continue;
}
break;
}
if (p == NULL) {
fprintf(stderr, "> Server: failed to bind\n");
return -1;
}
// All done with this structure.
freeaddrinfo(servinfo);
//PENDING_CONNS defined as 25
if (listen(sockfd, PENDING_CONNS) == -1) {
perror("> Server: listen");
return -1;
}
printf("> Server: waiting for connections...\n");
fd_set_blocking(sockfd, 0); // NON BLOCKING
int requestStack[1000];
int stackTop = 0;
// Main loop of coordinating thread.
while (1) {
sin_size = sizeof their_addr;
new_fd = accept(sockfd, (struct sockaddr *) &their_addr, &sin_size);
if (new_fd == -1) {
//queue was probably empty...
}
else {
inet_ntop(their_addr.ss_family,
GetInAddress((struct sockaddr *) &their_addr), s, sizeof s);
printf("> Server: got connection from %s\n", s);
requestStack[stackTop++] = new_fd;
printf("> Server: stack top is now %i with value %i.\n", stackTop, requestStack[stackTop-1]);
}
// MAGICAL TEST: continue;
// Linear search for a free worker thread...
unsigned int i;
int rc;
for (i = 0; i < g_NumWorkers && stackTop > 0; i++) {
rc = pthread_mutex_trylock(&(g_Workers[i].m_Mutex));
if (rc == EBUSY) // If it was already locked, skip worker thread.
continue;
else if (!rc) { // If we managed to lock the thread's mutex...
if (g_Workers[i].m_ClientFD == -1) { // Check if it is currently out of work.
g_Workers[i].m_ClientFD = requestStack[--stackTop];
printf("> Server: master thread assigned fd %i to thread %u\n", requestStack[stackTop], i);
}
rc = pthread_mutex_unlock(&(g_Workers[i].m_Mutex));
if (rc) {
printf("> Server: main thread mutex unlock failed on thread %i with error %i!\n", i, rc);
return -1;
}
}
else
printf("> Server: main thread try-lock failed on thread %i with error %i!\n", i, rc);
}
}
return 0;
}
对于代码的冗长以及可能错过手册页中的任何基本信息,我深表歉意。
根据要求,这里是从线程函数:
static void *WorkRoutine(void *args) {
ThreadData * const threadData = (ThreadData*) args;
pthread_mutex_t * const tMutex = &(threadData->m_Mutex);
const unsigned int tIndex = threadData->m_Index;
int clientFD = threadData->m_ClientFD;
// Each thread initializes its own mutex.
int rc = pthread_mutex_init(tMutex, NULL);
if (!rc)
printf("> Slave %u: mutex %p initialization was sucessful!\n", tIndex, tMutex);
if (rc && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
printf("> Slave %u: failed mutex initialization!\n", tIndex);
perror("pthread_mutex_init");
exit(-1); //TODO: implement graceful error exit.
}
// Synchronization point: all threads including the master thread wait for all slave mutexes to be initialized.
rc = pthread_barrier_wait(&g_StartBarrier);
if (rc && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
printf("> Slave %u: failed initial barrier synchronization!\n", tIndex);
perror("pthread_barrier_wait");
exit(-1); //TODO: implement graceful error exit.
}
while (1) { //TODO replace 1 with state var that is turned off when the master thread receives a request to shutdown server.
rc = pthread_mutex_lock(tMutex);
if (rc) {
printf("> Slave %u: mutex lock returned an error: %i!\n", tIndex, rc);
perror("pthread_mutex_lock()");
continue;
}
clientFD = threadData->m_ClientFD;
if (!threadData->m_Task && clientFD == -1) {
rc = pthread_mutex_unlock(tMutex);
int yieldStatus = sched_yield();
if (yieldStatus) {
printf("> Slave %u: error while attempting to yield!\n", tIndex);
perror("sched_yield()\n");
}
continue;
//printf("> Slave %u: yielding.\n", tIndex);
}
else if (!threadData->m_Task && clientFD != -1) {
// Read client request.
threadData->m_Task = (char*) calloc(FILE_BUFFER, sizeof(char)); //TODO: stop allocating buffer on every task.
printf("> Slave %u going to read from %i into %p\n", tIndex, clientFD, threadData->m_Task);
MessageHeader h;
GetHeader(&h, clientFD);
if (h.m_ID != BeginSession) {
//protocol implementation error
exit(-1);
}
printf("> Slave %u: expecting client command of length %i\n", tIndex, h.m_ContentSize);
int n = GetContent((void*) threadData->m_Task, h.m_ContentSize * sizeof(char), clientFD);
printf("> Slave %u: client of file descriptor %i sent [%s] at a total of %i bytes\n", tIndex, clientFD, threadData->m_Task, n);
short int remoteOperation;
int baseArgvSize = 10;
char **argv = (char**) calloc(baseArgvSize, sizeof(char*)); //TODO: stop allocating table on every task.
int argc = CmdToTable(threadData->m_Task, &argv, &baseArgvSize);
int localLen = strlen("local");
int remoteLen = strlen("remote");
if(!strncmp(threadData->m_Task, "remote", remoteLen)) {
remoteOperation = 1;
g_Workers[tIndex].m_Networking = 1;
}
else if(!strncmp(threadData->m_Task, "local", localLen)) {
remoteOperation = 0;
g_Workers[tIndex].m_Networking = 0;
g_Workers[tIndex].m_OutputDir = (char*) calloc(FILE_BUFFER, sizeof(char));
MessageHeader hPath;
GetHeader(&hPath, clientFD);
if (hPath.m_ID != SendMessage) {
//protocol implementation error
exit(-1);
}
int cnt = GetContent((void*) g_Workers[tIndex].m_OutputDir, hPath.m_ContentSize * sizeof(char), clientFD);
g_Workers[tIndex].m_OutputDir[hPath.m_ContentSize * sizeof(char)] = '/';
printf("> Slave %u: received message with %i bytes.\n", tIndex, cnt);
printf("> Slave %u: client output going to %s\n", tIndex, g_Workers[tIndex].m_OutputDir);
}
else {
printf("> Slave %u: received bogus client command: %s\n", tIndex, threadData->m_Task);
return NULL;
}
printf("> Slave %u: remote mode - %i.\n", tIndex, g_Workers[tIndex].m_Networking);
// Debug print the table built from the client command.
printf("> Slave %u: table has %i entries.\n", tIndex, argc);
int i;
for (i = 0; i < argc; i++) {
printf("> Slave %u: argument %i is %s.\n", tIndex, i, argv[i]);
}
// Prepare the input files the client will send.
char**filenames = (char**) calloc(argc, sizeof(char*));
FILE**files = (FILE**) calloc(argc, sizeof(FILE*));
unsigned int reqIDIndex = 0;
unsigned int reqIDLen = strlen(argv[reqIDIndex]);
if(!remoteOperation)
continue;
// Check which files need to be sent from the client.
for (i = reqIDIndex + 1; i < argc; i++) {
unsigned int argSize = strlen(argv[i]);
if (strstr(argv[i], FILETYPE_SEPARATOR) != NULL
&& strstr(argv[i], INDEX_FILETYPE) == NULL) {
// If the current argument has a file type and isn't an index type.
filenames[i] = calloc(reqIDLen + argSize + 2, sizeof(char)); // +2 to account for terminator char and hyphen between reqID and filename
printf("> Slave %u: arg %s check with req %s\n", tIndex, argv[i], argv[reqIDIndex]);
int noReqYet = strncmp(argv[i], argv[reqIDIndex], reqIDLen);
if (noReqYet) {
printf("> Slave %u: no prepended request yet!\n", tIndex);
strncpy(filenames[i], argv[reqIDIndex], reqIDLen);
filenames[i][reqIDLen] = '-';
}
// If the argument is a path to a file, get the filename and discard the path (that was local to the client).
char*nameStart = strrchr(argv[i], '/'); //TODO: make a define for 92: backslash ascii char number
printf("> Slave %u: going to remove backslash from %s\n", tIndex, argv[i]);
if (nameStart != NULL )
nameStart++;
else
nameStart = argv[i];
printf("> Slave %u: got %s after removing backslash\n", tIndex, nameStart);
printf("noReqYet: %i\n", noReqYet);
if (!noReqYet)
strncpy(filenames[i], nameStart, strlen(nameStart));
else
strncpy(filenames[i] + reqIDLen + 1, nameStart,
strlen(nameStart)); //+1 to account for the hyphen.
// When freeing the argv table, only need to free(argv). There is no need to free argv[i] elements.
argv[i] = filenames[i];
}
else if (strstr(argv[i], FILETYPE_SEPARATOR) != NULL && strstr(argv[i], INDEX_FILETYPE) != NULL ) {
// If the current argument is the index type.
char*nameStart = strrchr(argv[i], '/');
if (nameStart != NULL ) {
nameStart++;
argv[i] = nameStart;
}
}
}
// Debug print the needed files.
printf("> Slave %u: awaiting files:", tIndex);
for (i = 0; i < argc; i++) {
if (filenames[i] != NULL ) {
printf(" %s", filenames[i]);
}
}
printf("\n");
// Await each input file from the client.
for (i = 0; i < argc; i++) {
if (filenames[i] != NULL ) {
files[i] = GetFile(clientFD, filenames[i]); // Note: GetFile invokes fclose!
if (files[i])
printf("> Slave %u: locally stored file %s\n", tIndex, filenames[i]);
}
}
}
printf("> Slave %u: going to invoke SatisfyRequest with arguments: ", tIndex);
for (i = reqIDIndex; i < argc; i++) {
if (argv[i] != NULL )
printf("%s ", argv[i]);
}
printf("\n");
// Process the request against the index.
int res = SatisfyRequest(argc, argv, tIndex, clientFD);
char* dummy;
if (!res)
dummy = "Your request succeeded.";
else
dummy = "Sorry, there was an error.";
// Terminate session with the client.
SendMsg(EndSession, (void*)dummy, strlen(dummy), clientFD);
printf("> Slave %u: task result delivered to client of file descriptor %i.\n", tIndex, clientFD);
close(clientFD);
free(threadData->m_Task);
free(argv);
if(!remoteOperation) {
free(g_Workers[tIndex].m_OutputDir);
}
// Free received file names and file pointer array.
for (i = 0; i < argc; i++) {
if (filenames[i])
free(filenames[i]);
}
free(filenames);
free(files);
// Reset thread-local data.
threadData->m_Task = NULL;
threadData->m_ClientFD = -1;
clientFD = -1;
printf("> Slave %u: task finished.\n", tIndex);
rc = pthread_mutex_unlock(tMutex);
if(!rc)
continue;
}
if (rc) {
printf("> Slave %u: mutex unlock returned an error: %i!\n", tIndex, rc);
perror("pthread_mutex_unlock()");
}
return NULL;
}
这是代码的客户端部分:
char*cmdBuffer = (char*)calloc(FILE_BUFFER, sizeof(char));
char*cmdRealPath = (char*)calloc(FILE_BUFFER, sizeof(char));
int auxNameTableSz = 25;
char**auxNameTable = (char**)calloc(auxNameTableSz, sizeof(char*));
char* ip = argv[2];
char* port = argv[3];
char* networkMode = argv[4];
// char* reqID = argv[5];
const int cmdStartIndex = 6;
short int remoteOperation;
if(!strcmp(networkMode, "remote")) {
remoteOperation = 1;
auxNameTableSz = 0;
}
else if(!strcmp(networkMode, "local")) {
// If the client is on the server's machine, need to convert argument files to absolute paths.
remoteOperation = 0;
int i = cmdStartIndex;
int j = 0;
while(i < argc) {
if(strrchr(argv[i], '/') || strrchr(argv[i], '.')) {
argv[i] = realpath(argv[i], NULL);
auxNameTable[j++] = argv[i];
}
i++;
}
auxNameTableSz = j;
}
else {
printf("> Client: argument four must be either \"global\" or \"local\"!");
free(cmdBuffer);
free(cmdRealPath);
FreePathsTable(&auxNameTable, auxNameTableSz);
DeallocateGlobals();
return 0;
}
// Concatenate client arguments into a single string.
MakeCommand(argc, argv, cmdBuffer);
printf("> Client: [Command]: %s\n", cmdBuffer);
// Establish a (TCP) connection with the server.
if(PrepareConnection(ip, port)) {
// Failed to prepare connection.
free(cmdBuffer);
free(cmdRealPath);
FreePathsTable(&auxNameTable, auxNameTableSz);
DeallocateGlobals();
return 0;
}
if(ConnectToServer()) {
// Failed to establish a stream socket connection.
free(cmdBuffer);
free(cmdRealPath);
FreePathsTable(&auxNameTable, auxNameTableSz);
DeallocateGlobals();
return 0;
}
// Send the client command to the server.
printf("> Client: sending command of %i bytes to server: [%s].\n", (int)strlen(cmdBuffer), cmdBuffer);
int count=0;
if((count=WriteToServer(BeginSession, cmdBuffer)) == -1) {
// Command send failed.
free(cmdBuffer);
free(cmdRealPath);
FreePathsTable(&auxNameTable, auxNameTableSz);
DeallocateGlobals();
return 0;
}
printf("> Client sent command to the server at %i bytes.\n", count);
if(remoteOperation) {
// Send the correct input files to the server.
SendInputFiles(argc, argv, cmdStartIndex);
}
else {
// Send the real path of the client's working directory to the server.
char *localDir = realpath(".", NULL);
count=WriteToServer(SendMessage, localDir);
printf("> Client sent the real path to the server at %i bytes.\n", count);
free(localDir);
}
// Await reply from the server.
char *buffer = (char*) calloc(FILE_BUFFER, sizeof(char));
count = ReceiveFromServer(buffer);
char *backPtr = buffer;
if(remoteOperation) {
printf("> Client: got output list [%s] totalling %i bytes.\n", buffer, count);
// Receive the files produced by the server if the client and server are not on the same machine.
ReceiveOutputFiles(buffer);
// Await session termination.
buffer = backPtr;
memset(buffer, 0, sizeof(char)*FILE_BUFFER);
count = ReceiveFromServer(buffer);
}
else {
}
printf("> Client: server response: [%s]-[%s]\n", buffer, cmdBuffer);