0
#include "cs241.c"

#define THREAD_COUNT 10

int list_s;
int connections[THREAD_COUNT];
char space[THREAD_COUNT];

int done = 0;

pthread_mutex_t muxlock = PTHREAD_MUTEX_INITIALIZER;

int *numbers;
int numbers_count;

void *listener(void *arg) {
    int n = *(int *) arg;

    FILE *f = fdopen(connections[n], "r");
    if (f == NULL)
        printf("Could not open file\n");
    char *line = NULL;
    size_t *len = malloc(sizeof(int));
    while(getline(&line, len, f) != -1) {
        printf("%s", line);
        if (strcmp("END", line) == 0) {
                            pthread_mutex_lock(&muxlock);
            done = 1;
            pthread_mutex_unlock(&muxlock);                     
        }
    }

    space[n] = 't';
    fclose(f);
    free(len);
    close(connections[n]);
    return NULL;
}

void initialize() {
    int n;
    for (n = 0; n < THREAD_COUNT; n++) {
        space[n] = 't';
    }
}

int check() {
    int index;
    for (index = 0; index < THREAD_COUNT; index++) {
        if (space[index] == 't') {
            space[index] = 'f';
            return index;
        }
    }
    return 0;
}   

int main(int argc, char *argv[]) {
    int port = 0;
    int binder;
    int lis;
    int i = 0;
    int *j = malloc(sizeof(int));
    initialize();
    pthread_t threads[THREAD_COUNT];

    if ((argc != 2) || (sscanf(argv[1], "%d", &port) != 1)) {
        fprintf(stderr, "Usage: %s [PORT]\n", argv[0]);
        exit(1);
    }

    if (port < 1024) {
        fprintf(stderr, "Port must be greater than 1024\n");
        exit(1);
    }

    // set the initial conditions for the numbers array.
    numbers = malloc(sizeof(int));
    numbers_count = 0;

    struct sockaddr_in servaddr; // socket address structure
    // set all bytes in socket address structure to zero, and fill in the relevant data members
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);

    list_s = socket(AF_INET, SOCK_STREAM, 0);
    if (list_s < 0) {
        printf("Could not create socket\n");
        exit(EXIT_FAILURE);
    }
    binder = bind(list_s, (struct sockaddr *) &servaddr, sizeof(servaddr));
    if (binder < 0) {
        printf("Could not bind socket\n");
        exit(EXIT_FAILURE);
    }
    lis = listen(list_s, SOMAXCONN);
    if (lis < 0) {
        printf("Could not listen on socket\n");
        exit(EXIT_FAILURE);
    }
    SET_NON_BLOCKING(list_s);
    while (done != 1) {
        connections[i] = accept(list_s, NULL, NULL);
        if (connections[i] < 0) {
            if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
                continue;
            printf("Could not accept connection\n");
            exit(EXIT_FAILURE);
        }
        i = check();
        *j = i;
                    SET_BLOCKING(list_s);
        pthread_create(&threads[i], NULL, listener, j);
    }

    // Verify the array.
    //verify(numbers, numbers_count);

    free(j);
    close(list_s);

    exit(EXIT_SUCCESS);

}

所以我的main()中有一个while循环,当'done' = 1时应该退出。这是由listener()设置的,当它接收到'END'时。第一个问题是,当我在第一次迭代中发送“END”时,while 循环不会退出,只有在发送了另一个“END”之后。

我在另一个文件中有两个宏 SET_NON_BLOCKING 和 SET_BLOCKING 用于解除阻塞和阻塞套接字,因此如果没有连接,它会等待连接。下一个问题是当我不使用这些宏时,listener() 中的 getline() 无法读取从流中输出的所有内容。当我使用它们时,它根本无法打开流。

我认为一些问题在于将“j”传递给线程,因为当第二个线程启动时,它会在第一个线程可以读取之前覆盖“j”。但是,我已经在它上面呆了几天,却无处可去。任何帮助将不胜感激。谢谢!

另外我想问一下我的套接字阻塞和线程锁定是否在正确的位置?

4

2 回答 2

1

您可能必须发送 END 两次,因为您的主线程在accept()生成侦听器线程后立即阻塞。由于它被阻止accept()它看不到done == 1

您可以通过保持非阻塞模式来解决此问题。如果你这样做,你可能想睡觉以避免在一个紧密的循环中旋转。另一种选择是发送一个信号来唤醒接受,使其设置 EINTR。

就将连接索引传递给侦听器线程而言,您可能是正确的,线程之间存在覆盖值的竞争。由于 anint需要的字节数与 a 相同或更少void *,因此您实际上可以int直接传递 a 。例如:

pthread_create(&threads[i], NULL, listener, (void *)i);

然后在监听器中:

int n = (int)arg;

这是一种黑客行为。更完整的解决方案是使用 malloc 为每个线程分配一个单独的参数结构。侦听器线程将负责释放参数。

struct params *p = malloc(sizeof(struct params));
p.index = i;

pthread_create(&threads[i], NULL, listener, p);

然后在监听器中:

struct params *p = args;
if (p == NULL) {
  // handle error
  return NULL;
}
int n = p->index;
free(p);
于 2012-12-02T19:22:37.333 回答
1

您管理connections[]andspace[]数组的方式似乎有很多问题:

  • 您接受一个连接connections[i],但i您传递给listener线程的是返回的check()那个 - 这可能是完全不同的i
  • 每个线程都获得了相同的线程参数地址 -j仅分配一次,并且每次创建线程时都使用相同的分配。如果两个或更多线程几乎同时启动,您将丢失应该传递给其中一些线程的索引。
  • 当它完成时,listener将连接标记为可用(我认为这就是目的space[n] = 't';),然后用于connections[n]关闭连接。这些都不是通过任何同步来执行的。

其他一些评论:

  • 你到底为什么要动态分配len
  • doneinmain()的循环检查while应受互斥锁保护
  • 我不确定传输数据的确切协议是什么,但由于您使用的是getline()我假设它是一组以换行符结尾的行。在这种情况下,请记住getline()文档中的这一点信息:“缓冲区以空值结尾并包括换行符,如果找到的话”。如果该"END"行包含换行符,strcmp("END",line)则不会返回0
于 2012-12-02T19:32:38.450 回答