34

使用Grand Central Dispatch,可以轻松地在非主线程上执行耗时的任务,避免阻塞主线程并保持 UI 响应。只需在全局并发队列上使用dispatch_async和执行任务即可。

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
    // code
});

然而,有些事情听起来好得令人难以置信,比如这个通常有其缺点。在我们的 iOS 应用项目中大量使用它之后,最近我们发现它有 64 个线程的限制。一旦我们达到限制,应用程序将冻结/挂起。通过使用 Xcode 暂停应用程序,我们可以看到主线程由semaphore_wait_trap.

在网上搜索确认其他人也遇到了这个问题,但到目前为止还没有找到解决方案。

已达到调度线程硬限制:64(同步操作中阻塞的调度线程太多)

另一个stackoverflow问题证实了使用dispatch_syncand时dispatch_barrier_async也会出现这个问题。

问题:
由于 Grand Central Dispatch 有 64 个线程的限制,是否有任何解决方法?

提前致谢!

4

1 回答 1

70

好吧,如果你有约束力和决心,你可以摆脱 GCD 的束缚,然后继续使用 pthreads 突破操作系统每个进程的线程限制,但底线是:如果你正在打GCD 中的队列宽度限制,您可能需要考虑重新评估您的并发方法。

在极端情况下,有两种方法可以达到极限:

  1. 您可以通过阻塞系统调用在某些操作系统原语上阻塞 64 个线程。(I/O 绑定)
  2. 您可以合法地同时准备好 64 个可运行的任务。(CPU 绑定)

如果您处于情况 #1,那么推荐的方法是使用非阻塞 I/O。事实上,GCD 有一大堆调用,在 10.7/Lion IIRC 中引入,它们促进了 I/O 的异步调度并提高了线程重用。如果您使用 GCD I/O 机制,那么这些线程将不会被捆绑等待 I/O,当您的文件描述符(或 mach 端口)上的数据可用时,GCD 只会将您的块(或函数)排队。请参阅dispatch_io_create和朋友的文档。

如果有帮助,这里有一个使用 GCD I/O 机制实现的 TCP 回显服务器的小示例(不提供保证):

in_port_t port = 10000;
void DieWithError(char *errorMessage);

// Returns a block you can call later to shut down the server -- caller owns block.
dispatch_block_t CreateCleanupBlockForLaunchedServer()
{
    // Create the socket
    int servSock = -1;
    if ((servSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
        DieWithError("socket() failed");
    }

    // Bind the socket - if the port we want is in use, increment until we find one that isn't
    struct sockaddr_in echoServAddr;
    memset(&echoServAddr, 0, sizeof(echoServAddr));
    echoServAddr.sin_family = AF_INET;
    echoServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    do {
        printf("server attempting to bind to port %d\n", (int)port);
        echoServAddr.sin_port = htons(port);
    } while (bind(servSock, (struct sockaddr *) &echoServAddr, sizeof(echoServAddr)) < 0 && ++port);

    // Make the socket non-blocking
    if (fcntl(servSock, F_SETFL, O_NONBLOCK) < 0) {
        shutdown(servSock, SHUT_RDWR);
        close(servSock);
        DieWithError("fcntl() failed");
    }

    // Set up the dispatch source that will alert us to new incoming connections
    dispatch_queue_t q = dispatch_queue_create("server_queue", DISPATCH_QUEUE_CONCURRENT);
    dispatch_source_t acceptSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, servSock, 0, q);
    dispatch_source_set_event_handler(acceptSource, ^{
        const unsigned long numPendingConnections = dispatch_source_get_data(acceptSource);
        for (unsigned long i = 0; i < numPendingConnections; i++) {
            int clntSock = -1;
            struct sockaddr_in echoClntAddr;
            unsigned int clntLen = sizeof(echoClntAddr);

            // Wait for a client to connect
            if ((clntSock = accept(servSock, (struct sockaddr *) &echoClntAddr, &clntLen)) >= 0)
            {
                printf("server sock: %d accepted\n", clntSock);

                dispatch_io_t channel = dispatch_io_create(DISPATCH_IO_STREAM, clntSock, q, ^(int error) {
                    if (error) {
                        fprintf(stderr, "Error: %s", strerror(error));
                    }
                    printf("server sock: %d closing\n", clntSock);
                    close(clntSock);
                });

                // Configure the channel...
                dispatch_io_set_low_water(channel, 1);
                dispatch_io_set_high_water(channel, SIZE_MAX);

                // Setup read handler
                dispatch_io_read(channel, 0, SIZE_MAX, q, ^(bool done, dispatch_data_t data, int error) {
                    BOOL close = NO;
                    if (error) {
                        fprintf(stderr, "Error: %s", strerror(error));
                        close = YES;
                    }

                    const size_t rxd = data ? dispatch_data_get_size(data) : 0;
                    if (rxd) {
                        // echo...
                        printf("server sock: %d received: %ld bytes\n", clntSock, (long)rxd);
                        // write it back out; echo!
                        dispatch_io_write(channel, 0, data, q, ^(bool done, dispatch_data_t data, int error) {});
                    }
                    else {
                        close = YES;
                    }

                    if (close) {
                        dispatch_io_close(channel, DISPATCH_IO_STOP);
                        dispatch_release(channel);
                    }
                });
            }
            else {
                printf("accept() failed;\n");
            }
        }
    });

    // Resume the source so we're ready to accept once we listen()
    dispatch_resume(acceptSource);

    // Listen() on the socket
    if (listen(servSock, SOMAXCONN) < 0) {
        shutdown(servSock, SHUT_RDWR);
        close(servSock);
        DieWithError("listen() failed");
    }

    // Make cleanup block for the server queue
    dispatch_block_t cleanupBlock = ^{
        dispatch_async(q, ^{
            shutdown(servSock, SHUT_RDWR);
            close(servSock);
            dispatch_release(acceptSource);
            dispatch_release(q);
        });
    };

    return Block_copy(cleanupBlock);
}

无论如何......回到手头的话题:

如果您处于第二种情况,您应该问自己:“我真的通过这种方法获得了什么吗?” 假设您拥有最出色的 MacPro——12 个内核、24 个超线程/虚拟内核。有 64 个线程,你有一个大约。3:1 线程与虚拟内核的比率。上下文切换和缓存未命中不是免费的。请记住,我们假设您在这种情况下不受 I/O 限制,因此您通过拥有比内核更多的任务所做的所有事情都是在上下文切换和缓存抖动上浪费 CPU 时间。

实际上,如果您的应用程序因达到队列宽度限制而挂起,那么最有可能的情况是您的队列已饿死。您可能已经创建了一个减少到死锁的依赖项。我最常看到的情况是当多个互锁线程试图dispatch_sync在同一个队列上时,当没有线程剩余时。这总是失败的。

原因如下:队列宽度是一个实现细节。GCD 的 64 线程宽度限制没有记录,因为设计良好的并发架构不应该依赖于队列宽度。您应该始终设计您的并发架构,以便 2 线程宽的队列最终将完成与 1000 线程宽的队列相同的结果(如果更慢)。如果你不这样做,你的队列总是有可能会饿死。将您的工作负载划分为可并行化的单元应该让您自己接受优化的可能性,而不是基本功能的要求。在开发期间强制执行此规则的一种方法是尝试在使用并发队列但期望非互锁行为的地方使用串行队列。

另外,对于你原来的问题的精确点:IIUC,64 个线程的限制是每个顶级并发队列64 个线程,所以如果你真的觉得需要,你可以使用所有三个顶级并发队列(默认、高和低优先级)以实现总共超过 64 个线程。请不要这样做。修复您的设计,使其不会饿死自己。你会更快乐。无论如何,正如我上面所暗示的,如果你正在饿死一个 64 线程宽的队列,你最终可能只会填满所有三个顶级队列和/或遇到每个进程的线程限制,并且也会这样饿死自己。

于 2013-03-01T04:57:26.510 回答