做这种几乎不可能的事情的关键是使用 InterlockedCompareExchange。(这是 Win32 使用的名称,但任何支持多线程的平台都将具有等效的 InterlockedCompareExchange)。
这个想法是,你制作一个结构的副本(它必须足够小以执行原子读取(64 或者如果你可以处理一些不可移植性,x86 上的 128 位)。
您使用建议的更新制作另一个副本,执行逻辑并更新副本,然后使用 InterlockedCompareExchange 更新“真实”结构。InterlockedCompareExchange 所做的是,原子地确保该值仍然是您在状态更新之前开始使用的值,如果它仍然是该值,则使用新状态原子地更新该值。通常,这被包裹在一个无限循环中,该循环一直在尝试,直到其他人没有更改中间的值。这是大致的模式:
union State
{
struct
{
short a;
short b;
};
uint32_t volatile rawState;
} state;
void DoSomething()
{
// Keep looping until nobody else changed it behind our back
for (;;)
{
state origState;
state newState;
// It's important that you only read the state once per try
origState.rawState = state.rawState;
// This must copy origState, NOT read the state again
newState.rawState = origState.rawState;
// Now you can do something impossible to do atomically...
// This example takes a lot of cycles, there is huge
// opportunity for another thread to come in and change
// it during this update
if (newState.b == 3 || newState.b % 6 != 0)
{
newState.a++;
}
// Now we atomically update the state,
// this ONLY changes state.rawState if it's still == origState.rawState
// In either case, InterlockedCompareExchange returns what value it has now
if (InterlockedCompareExchange(&state.rawState, newState.rawState,
origState.rawState) == origState.rawState)
return;
}
}
(如果上面的代码实际上没有编译,请原谅 - 我把它写在我的头上)
伟大的。现在你可以让无锁算法变得简单。错误的!问题是您可以以原子方式更新的数据量受到严重限制。
一些无锁算法使用一种“帮助”并发线程的技术。例如,假设您有一个希望能够从多个线程更新的链表,其他线程可以通过对“first”和“last”指针进行更新来“帮助”它们,如果它们正在运行并看到它们是在“last”指向的节点上,但节点中的“next”指针不为空。在这个例子中,当注意到“最后一个”指针是错误的时,他们更新最后一个指针,只有当它仍然指向当前节点时,使用一个互锁的比较交换。
不要落入“旋转”或循环的陷阱(如自旋锁)。虽然短暂旋转很有价值,因为您希望“其他”线程完成某些事情 - 他们可能不会。“其他”线程可能已经进行了上下文切换,并且可能不再运行。您只是在消耗 CPU 时间,通过旋转直到条件为真来烧电(也许会杀死笔记本电脑的电池)。在你开始旋转的那一刻,你不妨扔掉你的无锁代码并用锁编写它。锁比无界旋转好。
只是为了从困难到荒谬,考虑一下你可能会让自己陷入其他架构的混乱——x86/x64 上的事情通常是相当宽容的,但是当你进入其他“弱排序”架构时,你就会进入事情发生的领域毫无意义 - 内存更新不会按程序顺序发生,因此您对其他线程正在做什么的所有心理推理都会消失。(即使 x86/x64 也有一种称为“写入组合”的内存类型,它通常在更新视频内存时使用,但可用于任何需要栅栏的内存缓冲区硬件)这些架构要求您使用“内存栅栏”操作来保证围栏之前的所有读/写/两者都将是全局可见的(由其他核心)。写栅栏保证栅栏之前的任何写入在栅栏之后的任何写入之前都是全局可见的。读栅栏将保证栅栏之后的读取不会在栅栏之前推测性地执行。读/写栅栏(又名完整栅栏或内存栅栏)将同时保证。栅栏非常昂贵。(有些人使用术语“障碍”而不是“围栏”)
我的建议是先用锁/条件变量来实现它。如果您在完美运行时遇到任何问题,那么尝试进行无锁实现是没有希望的。并且总是测量,测量,测量。您可能会发现使用锁实现的性能非常好 - 没有一些不稳定的无锁实现的不确定性和一个讨厌的挂起错误,只有在您向重要客户进行演示时才会出现。也许您可以通过将原始问题重新定义为更容易解决的问题来解决问题,也许可以通过重组工作以使更大的项目(或成批的项目)进入集合,从而减轻整个事情的压力。
Writing lockless concurrent algorithms is very difficult (as you've seen written 1000x elsewhere I'm sure). It is often not worth the effort either.