15

我正在为密集型网络应用程序实现一个无锁的单一生产者单一消费者队列。我有一堆工作线程在他们自己的单独队列中接收工作,然后他们将其出列并处理。

从这些队列中移除锁极大地提高了高负载下的性能,但是当队列为空时它们不再阻塞,这反过来又导致 CPU 使用率飙升。

如何有效地导致线程阻塞,直到它可以成功地使某些东西出队或被杀死/中断?

4

6 回答 6

16

如果您使用的是 Linux,请考虑使用Futex。它通过使用原子操作而不是像互斥锁那样的内核调用来提供非锁定实现的性能,但是如果由于某些条件不成立(即锁争用)而需要将进程设置为空闲,它将然后进行适当的内核调用以使进程进入睡眠状态并在将来的事件中将其唤醒。它基本上就像一个非常快的信号量。

于 2011-05-22T18:44:00.977 回答
9

在 Linux 上,futex可用于阻塞线程。但请注意,Futex 很棘手

更新:条件变量比 futex 使用起来更安全,并且更便携。但是,条件变量与互斥锁结合使用,因此严格来说结果将不再是无锁的。但是,如果您的主要目标是性能(而不是全局进度的保证),并且锁定部分(即线程唤醒后检查的条件)很小,那么您可能会得到满意的结果而无需进入将 futex 集成到算法中的微妙之处。

于 2011-05-22T18:45:01.003 回答
7

如果您在 Windows 上,您将无法使用 futex,但 Windows Vista 有一个类似的机制,称为Keyed Events。不幸的是,这不是已发布 API 的一部分(它是 NTDLL 本机 API),但您可以使用它,只要您接受它可能会在未来版本的 Windows 中更改的警告(并且您不需要在Vista 之前的内核)。请务必阅读我上面链接的文章。这是它如何工作的未经测试的草图:

/* Interlocked SList queue using keyed event signaling */

struct queue {
    SLIST_HEADER slist;
    // Note: Multiple queues can (and should) share a keyed event handle
    HANDLE keyed_event;
    // Initial value: 0
    // Prior to blocking, the queue_pop function increments this to 1, then
    // rechecks the queue. If it finds an item, it attempts to compxchg back to
    // 0; if this fails, then it's racing with a push, and has to block
    LONG block_flag;
};

void init_queue(queue *qPtr) {
    NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
    InitializeSListHead(&qPtr->slist);
    qPtr->blocking = 0;
}

void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
    InterlockedPushEntrySList(&qPtr->slist, entry);

    // Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
    // have committed to a keyed-event handshake
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
    if (oldv) {
        NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    }
}

SLIST_ENTRY *queue_pop(queue *qPtr) {
    SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry)
        return entry; // fast path

    // Transition block flag 0 -> 1. We must recheck the queue after this point
    // in case we race with queue_push; however since ReleaseKeyedEvent
    // blocks until it is matched up with a wait, we must perform the wait if
    // queue_push sees us
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);

    assert(oldv == 0);

    entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry) {
        // Try to abort
        oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
        if (oldv == 1)
            return entry; // nobody saw us, we can just exit with the value
    }

    // Either we don't have an entry, or we are forced to wait because
    // queue_push saw our block flag. So do the wait
    NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    // block_flag has been reset by queue_push

    if (!entry)
        entry = InterlockedPopEntrySList(&qPtr->slist);
    assert(entry);

    return entry;
}

您还可以使用类似的协议,使用Slim Read Write locks 和Condition Variables,并使用无锁快速路径。这些是键控事件的包装器,因此它们可能比直接使用键控事件产生更多开销。

于 2011-05-23T01:40:05.333 回答
1

你试过条件等待吗?当队列变空时,就开始等待新的工作。将作业放入队列的线程应该触发信号。这样你只在队列为空时使用锁。

https://computing.llnl.gov/tutorials/pthreads/#ConditionVariables

于 2011-05-22T18:49:01.217 回答
1

您可以使用 sigwait() 函数使线程休眠。您可以使用 pthread_kill 唤醒线程。这比条件变量快得多。

于 2011-06-15T20:15:00.363 回答
0

您可以在等待时添加睡眠。只需选择您愿意等待的最大等待时间,然后执行以下操作(伪代码,因为我不记得 pthread 语法):

WAIT_TIME = 100; // Set this to whatever you're happy with
while(loop_condition) {
   thing = get_from_queue()
   if(thing == null) {
       sleep(WAIT_TIME);
   } else {
       handle(thing);
   }
}

即使是像 100 毫秒睡眠这样的短时间也应该会显着降低 CPU 使用率。我不确定在什么时候上下文切换会比忙着等待更糟糕。

于 2011-05-22T18:40:13.863 回答