4

我知道阻塞队列和无锁队列,这是Scott 等人提供的这些实现的一个很好的例子。,但是有无锁阻塞队列的实现吗?

在无锁阻塞队列中,出队不需要锁定,但如果队列中没有项目,它将阻塞消费者。有没有这种野兽的实现?我更喜欢它们是 C# 实现,但任何实现在技术上都可以工作。

更新:

我想我最终在 D14.1 行出现了竞争条件:

initialize(Q: pointer to queue t)
node = new node() // Allocate a free node
node–>next.ptr = NULL // Make it the only node in the linked list
Q–>Head = Q–>Tail = node // Both Head and Tail point to it
signal = new ManualResetEvent() // create a manual reset event

    enqueue(Q: pointer to queue t, value: data type)
E1:     node = new node() // Allocate a new node from the free list
E2:     node–>value = value // Copy enqueued value into node
E3:     node–>next.ptr = NULL // Set next pointer of node to NULL
E4:     loop // Keep trying until Enqueue is done
E5:         tail = Q–>Tail // Read Tail.ptr and Tail.count together
E6:         next = tail.ptr–>next // Read next ptr and count fields together
E7:         if tail == Q–>Tail // Are tail and next consistent?
E8:             if next.ptr == NULL // Was Tail pointing to the last node?
E9:                 if CAS(&tail.ptr–&gt;next, next, <node, next.count+1>) // Try to link node at the end of the linked list
E10.1:                  signal.Set() // Signal to the blocking dequeues
E10.2:                  break // Enqueue is done. Exit loop
E11:                endif
E12:            else // Tail was not pointing to the last node
E13:                CAS(&Q–&gt;Tail, tail, <next.ptr, tail.count+1>) // Try to swing Tail to the next node
E14:            endif
E15:        endif
E16:     endloop
E17:    CAS(&Q–&gt;Tail, tail, <node, tail.count+1>) // Enqueue is done. Try to swing Tail to the inserted node


    dequeue(Q: pointer to queue t, pvalue: pointer to data type): boolean
D1:     loop // Keep trying until Dequeue is done
D2:         head = Q–&gt;Head // Read Head
D3:         tail = Q–&gt;Tail // Read Tail
D4:         next = head–&gt;next // Read Head.ptr–&gt;next
D5:         if head == Q–&gt;Head // Are head, tail, and next consistent?
D6:             if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7:                 if next.ptr == NULL // Is queue empty?
D8.1:                   signal.WaitOne() // Block until an enqueue
D8.X:                   // remove the return --- return FALSE // Queue is empty, couldn’t dequeue
D9:                 endif
D10:                CAS(&Q–&gt;Tail, tail, <next.ptr, tail.count+1>) // Tail is falling behind. Try to advance it
D11:            else // No need to deal with Tail
                    // Read value before CAS, otherwise another dequeue might free the next node
D12:                *pvalue = next.ptr–&gt;value
D13:                if CAS(&Q–&gt;Head, head, <next.ptr, head.count+1>) // Try to swing Head to the next node
D14.1:                  if(head.ptr == tail.ptr && next.ptr==NULL) // Is queue empty? <--- POSSIBLE RACE CONDITION???
D14.2:                      signal.Reset()
D14.3:                  break // Dequeue is done. Exit loop
D15:                endif
D16:            endif
D17:         endif
D18:    endloop
D19:    free(head.ptr) // It is safe now to free the old dummy node
D20:    return TRUE // Queue was not empty, dequeue succeeded
4

2 回答 2

1

.NET 并行扩展:(内置,用于 .NET 4.0+):

http://blogs.msdn.com/b/pfxteam/archive/2010/01/26/9953725.aspx


StackOverflow 实现中的某个人:

.net 中的无锁结构



回应评论中的澄清:

如果空时阻塞不忙(等待信号),那么您似乎需要一个计数信号量来等待。

另一种方法是使用常规队列,结合原子比较和交换或自旋锁以防止同时访问,
然后如果消费者线程在队列为空时尝试进入,则锁定二进制信号量,
如果提供者线程在队列时尝试进入为空,解锁二进制信号量以唤醒所有休眠消费者(并将它们返回到自旋锁,以便多个线程只有在队列中有足够的项目时才能进入)。

eg // 伪代码

/// Non-blocking lock (busy wait)
void SpinLock()
{
    While (CompareAndExchange(myIntegerLock, -1, 0) != 0)
    {
        // wait
    }
}

void UnSpinLock()
{
    Exchange(myIntegerLock, 0);
}

void AddItem(item)
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    queue.Push(item);

    // Unblock any blocked consumers
    if (queue.Count() == 1)
    {
        semaphore.Increase();
    }

    // End of CAS synchronization block
    UnSpinLock();
}

Item RemoveItem()
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    // If empty then block
    if (queue.Count() == 0)
    {
        // End of CAS synchronization block
        UnSpinLock();

        // Block until queue is not empty
        semaphore.Decrease();

        // Try again (may fail again if there is more than one consumer)
        return RemoveItem();
    }

    result = queue.Pop();

    // End of CAS synchronization block
    UnSpinLock();

    return result;
}
于 2011-04-20T18:01:16.767 回答
1

编辑:

SIMPLER: 我建议您的队列不需要头部和尾部。只要有头。如果head = NULL,则列表为空。将项目添加到头部。从头部移除物品。更简单、更少的 CAS 操作。

HELPER: 我在评论中建议您需要考虑一个帮助方案来处理比赛。在我的“无锁”版本中,如果它们不会引起问题,那么可以有罕见的竞争条件。我喜欢额外的性能,而不是让空闲线程睡眠几毫秒太久。

帮手的想法。当消费者抓住工作时,它可以检查是否有处于昏迷状态的线程。当生产者添加工作时,它可以在昏迷中寻找线程。

所以跟踪卧铺。使用睡眠者的链接列表。当一个线程决定没有工作时,它会将自己标记为 !awake 并将 CAS 自己标记为 sleeper 列表的头部。当接收到唤醒信号时,线程将自身标记为已唤醒。然后新唤醒的线程清理 sleeper 列表。要清理并发单链表,你必须小心。你只能对头部进行CAS。因此,当睡眠者列表的头部标记为唤醒时,您可以 CAS 关闭头部。如果头部没有醒来,继续扫描列表并“懒惰地取消链接”(我把那个术语做了)剩余的清醒项目。延迟取消链接很简单......只需将上一个项目的下一个 ptr 设置在唤醒项目上。并发扫描仍然会到达列表的末尾,即使它到达 !awake 的项目。随后的扫描会看到一个较短的列表。最后,每当您添加工作或完成工作时,请扫描睡眠者列表以查找 !awake 项目。如果消费者在抓取一些工作后注意到工作仍然存在(.next work != NULL),消费者可以扫描睡眠者列表并通知第一个线程是 !awake。生产者添加工作后,生产者可以扫描 sleeper 列表并执行相同操作。

如果您有广播场景并且无法向单个线程发出信号,那么只需保持睡眠线程的计数即可。虽然该计数仍然 > 0,但消费者注意到剩余工作和消费者添加工作将广播信号以唤醒。

在我们的环境中,每个 SMT 有 1 个线程,因此休眠列表永远不会那么大(除非我得到其中一台新的 128 个并发线程机器!)我们在事务的早期生成工作项。在第一秒内,我们可能会生成 10,000 个工作项,而这种生产会迅速减少。线程在这些工作项上工作了几秒钟。所以,我们很少有空闲池上的线程。

你仍然可以使用锁 如果你只有 1 个线程并且很少产生工作......这对你不起作用。在这种情况下,互斥体的性能无关紧要,您应该只使用它们。在这种情况下,对睡眠队列使用锁。将无锁视为“重要的地方没有锁”。

上一篇文章: 你是说:有一个工作队列。有很多消费者线程。如果有任何工作,消费者需要拉出工作并执行消费者线程需要休眠直到有工作。

如果你是,我们只使用原子操作来做到这一点:

工作队列是一个链表。还有一个休眠线程的链表。

添加工作:CAS 列表的头部到新工作。添加工作时,我们检查睡眠列表中是否有任何线程。如果有,在添加工作之前,我们将一个睡眠者从睡眠者列表中移除,设置其工作=新工作,然后通知睡眠者唤醒。我们将工作添加到工作队列中。

消耗工作: CAS 列表的头部到 head->next。如果工作列表的头部为 NULL,我们将线程 CAS 放入睡眠者列表。

一旦一个线程有一个工作项,该线程必须将该工作项的状态 CAS 设置为 WORK_INPROGRESS 或类似的。如果失败,则意味着工作正在由另一个人执行,因此消费者线程返回搜索工作。如果一个线程被唤醒并且有一个工作项,它仍然需要对状态进行 CAS。

因此,如果添加工作,睡觉的消费者总是会被唤醒并交给工作。pthread_kill() 总是在 sigwait() 唤醒线程,因为即使线程在信号之后到达 sigwait,也会收到信号。这解决了线程将自己置于睡眠者列表但在进入睡眠之前得到信号的问题。所发生的只是线程试图拥有它的 ->work(如果有的话)。未能拥有工作或没有工作将线程发送回消费开始。如果一个线程未能 CAS 到 sleeper 列表,则意味着另一个线程击败了它,或者生产者拉下了一个 sleeper。为了安全起见,我们让线程的行为就像它刚刚被唤醒一样。

我们这样做没有竞争条件,并且有多个生产者和消费者。我们还能够扩展它以允许线程也可以在单个工作项上休眠。

于 2011-04-20T20:59:29.413 回答