0

我对无锁数据结构很陌生,所以对于我编写的练习(我希望用作)一个有界无锁双端队列(还没有调整大小,只是想让基本案例工作)。我只是想从知道他们在做什么的人那里得到一些确认,以确认我是否有正确的想法和/或如何改进这一点。

class LocklessDeque
{
  public:

    LocklessDeque() : m_empty(false),
                      m_bottom(0),
                      m_top(0) {}


    ~LocklessDeque()
    {
      // Delete remaining tasks
      for( unsigned i = m_top; i < m_bottom; ++i )
        delete m_tasks[i];
    }


    void PushBottom(ITask* task)
    {
      m_tasks[m_bottom] = task;

      InterlockedIncrement(&m_bottom);
    }


    ITask* PopBottom()
    {
      if( m_bottom - m_top > 0 )
      {
        m_empty = false;

        InterlockedDecrement(&m_bottom);

        return m_tasks[m_bottom];
      }

      m_empty = true;

      return NULL;
    }


    ITask* PopTop()
    {
      if( m_bottom - m_top > 0 )
      {
        m_empty = false;

        InterlockedIncrement(&m_top);

        return m_tasks[m_top];
      }

      m_empty = true;

      return NULL;
    }


    bool IsEmpty() const
    {
      return m_empty;
    }

  private:

    ITask* m_tasks[16];

    bool m_empty;

    volatile unsigned m_bottom;
    volatile unsigned m_top;

};
4

4 回答 4

3

看着这个,我认为这将是一个问题:

void PushBottom(ITask* task)
{
  m_tasks[m_bottom] = task;
  InterlockedIncrement(&m_bottom);
}

如果在实际的多线程环境中使用它,我认为您在设置m_tasks[m_bottom]. 想想如果你有两个线程同时尝试这样做会发生什么——你不能确定哪个线程实际设置了 m_tasks[m_bottom]。

查看这篇文章,这是对无锁队列的合理讨论。

于 2009-12-02T19:18:06.700 回答
2

您使用m_bottomandm_top成员来索引数组是不行的。您可以使用 InterlockedXxxx() 的返回值来获取安全索引。您需要丢失 IsEmpty(),它在多线程场景中永远不会准确。PopXxx 中的空支票也有同样的问题。我不明白没有互斥锁你怎么能做到这一点。

于 2009-12-02T20:03:09.807 回答
1

做这种几乎不可能的事情的关键是使用 InterlockedCompareExchange。(这是 Win32 使用的名称,但任何支持多线程的平台都将具有等效的 InterlockedCompareExchange)。

这个想法是,你制作一个结构的副本(它必须足够小以执行原子读取(64 或者如果你可以处理一些不可移植性,x86 上的 128 位)。

您使用建议的更新制作另一个副本,执行逻辑并更新副本,然后使用 InterlockedCompareExchange 更新“真实”结构。InterlockedCompareExchange 所做的是,原子地确保该值仍然是您在状态更新之前开始使用的值,如果它仍然是该值,则使用新状态原子地更新该值。通常,这被包裹在一个无限循环中,该循环一直在尝试,直到其他人没有更改中间的值。这是大致的模式:

union State
{
    struct
    {
        short a;
        short b;
    };
    uint32_t volatile rawState;
} state;

void DoSomething()
{
    // Keep looping until nobody else changed it behind our back
    for (;;)
    {
        state origState;
        state newState;

        // It's important that you only read the state once per try
        origState.rawState = state.rawState;
        // This must copy origState, NOT read the state again
        newState.rawState = origState.rawState;

        // Now you can do something impossible to do atomically...
        // This example takes a lot of cycles, there is huge
        // opportunity for another thread to come in and change
        // it during this update
        if (newState.b == 3 || newState.b % 6 != 0)
        {
            newState.a++;
        }

        // Now we atomically update the state,
        // this ONLY changes state.rawState if it's still == origState.rawState
        // In either case, InterlockedCompareExchange returns what value it has now
        if (InterlockedCompareExchange(&state.rawState, newState.rawState,
                origState.rawState) == origState.rawState)
            return;
    }
}

(如果上面的代码实际上没有编译,请原谅 - 我把它写在我的头上)

伟大的。现在你可以让无锁算法变得简单。错误的!问题是您可以以原子方式更新的数据量受到严重限制。

一些无锁算法使用一种“帮助”并发线程的技术。例如,假设您有一个希望能够从多个线程更新的链表,其他线程可以通过对“first”和“last”指针进行更新来“帮助”它们,如果它们正在运行并看到它们是在“last”指向的节点上,但节点中的“next”指针不为空。在这个例子中,当注意到“最后一个”指针是错误的时,他们更新最后一个指针,只有当它仍然指向当前节点时,使用一个互锁的比较交换。

不要落入“旋转”或循环的陷阱(如自旋锁)。虽然短暂旋转很有价值,因为您希望“其他”线程完成某些事情 - 他们可能不会。“其他”线程可能已经进行了上下文切换,并且可能不再运行。您只是在消耗 CPU 时间,通过旋转直到条件为真来烧电(也许会杀死笔记本电脑的电池)。在你开始旋转的那一刻,你不妨扔掉你的无锁代码并用锁编写它。锁比无界旋转好。

只是为了从困难到荒谬,考虑一下你可能会让自己陷入其他架构的混乱——x86/x64 上的事情通常是相当宽容的,但是当你进入其他“弱排序”架构时,你就会进入事情发生的领域毫无意义 - 内存更新不会按程序顺序发生,因此您对其他线程正在做什么的所有心理推理都会消失。(即使 x86/x64 也有一种称为“写入组合”的内存类型,它通常在更新视频内存时使用,但可用于任何需要栅栏的内存缓冲区硬件)这些架构要求您使用“内存栅栏”操作来保证围栏之前的所有读/写/两者都将是全局可见的(由其他核心)。写栅栏保证栅栏之前的任何写入在栅栏之后的任何写入之前都是全局可见的。读栅栏将保证栅栏之后的读取不会在栅栏之前推测性地执行。读/写栅栏(又名完整栅栏或内存栅栏)将同时保证。栅栏非常昂贵。(有些人使用术语“障碍”而不是“围栏”)

我的建议是先用锁/条件变量来实现它。如果您在完美运行时遇到任何问题,那么尝试进行无锁实现是没有希望的。并且总是测量,测量,测量。您可能会发现使用锁实现的性能非常好 - 没有一些不稳定的无锁实现的不确定性和一个讨厌的挂起错误,只有在您向重要客户进行演示时才会出现。也许您可以通过将原始问题重新定义为更容易解决的问题来解决问题,也许可以通过重组工作以使更大的项目(或成批的项目)进入集合,从而减轻整个事情的压力。

Writing lockless concurrent algorithms is very difficult (as you've seen written 1000x elsewhere I'm sure). It is often not worth the effort either.

于 2011-09-15T08:18:35.950 回答
0

解决亚伦指出的问题,我会做类似的事情:

void PushBottom(ITask *task) { 
    int pos = InterlockedIncrement(&m_bottom);
    m_tasks[pos] = task;
}

同样,要弹出:

ITask* PopTop() {
  int pos = InterlockedIncrement(&m_top);
  if (pos == m_bottom) // This is still subject to a race condition.
      return NULL;
  return m_tasks[pos];
}

我会从设计中完全消除m_empty两者。IsEmpty()IsEmpty 返回的结果受制于竞争条件,这意味着当您查看该结果时,它很可能已经过时(即,当您查看它返回的内容时,它告诉您的关于队列的内容可能是错误的)。同样,m_empty它只提供没有它就已经可用的信息记录,一种产生陈旧数据的方法。使用m_empty并不能保证它不能正常工作,但它确实会大大增加出现错误的机会,IMO。

我猜这是由于代码的临时性质,但现在你在数组边界方面也有一些严重的问题。你没有做任何事情来强制你的数组索引环绕,所以一旦你尝试将第 17 个任务推入队列,你就会遇到一个大问题。

编辑:我应该指出,评论中提到的竞争条件非常严重——而且它也不是唯一的。虽然比原来的要好一些,但这应该被误认为是可以正常工作的代码。

我会说编写正确的无锁代码比编写使用锁的正确代码要困难得多。我不知道有谁在没有对使用锁定的代码有深入了解的情况下这样。基于原始代码,我会说最好从编写和理解使用锁的队列的代码开始,并且只有当您使用它来更好地理解所涉及的问题时才会真正使尝试无锁代码。

于 2009-12-02T20:03:53.637 回答