我通常不会抱怨有人努力实现快速代码:这通常是一个很好的练习,可以更好地理解编程和更快的代码。
我也不会在这里抱怨,但我可以明确指出,快速自旋锁 3 条指令或更多指令的问题 - 至少在 x86 架构上 - 是徒劳的。
原因如下:
使用典型的代码序列调用自旋锁
lock_variable DW 0 ; 0 <=> free
mov ebx,offset lock_variable
mov eax,1
xchg eax,[ebx]
; if eax contains 0 (no one owned it) you own the lock,
; if eax contains 1 (someone already does) you don't
释放自旋锁是微不足道的
mov ebx,offset lock_variable
mov dword ptr [ebx],0
xchg 指令提高了处理器上的锁定引脚,这实际上意味着我想要在接下来的几个时钟周期内使用总线。该信号通过高速缓存传递到最慢的总线主控设备,通常是 PCI 总线。当每个总线主控设备完成后,locka(锁定确认)信号就会被发回。然后进行实际的交换。问题是 lock/locka 序列需要很长时间。PCI 总线可能以 33MHz 运行,有几个延迟周期。在 3.3 GHz CPU 上,这意味着每个 PCI 总线周期需要一百个 CPU 周期。
根据经验,我假设一个锁需要 300 到 3000 个 CPU 周期才能完成,最后我什至不知道我是否会拥有这个锁。因此,您可以通过“快速”自旋锁节省的几个周期将是海市蜃楼,因为没有像下一个锁那样,这将取决于您在那段时间内的总线情况。
________________编辑________________
我刚刚读到自旋锁是一个“大量使用的对象”。好吧,您显然不明白自旋锁在每次调用时都会消耗大量的 CPU 周期。或者,换一种说法,每次调用它都会失去大量的处理能力。
使用自旋锁(或它们更大的兄弟,临界区)时的技巧是尽可能少地使用它们,同时仍能实现预期的程序功能。在所有地方使用它们很容易,因此您最终会表现不佳。
不仅仅是编写快速代码,还包括组织数据。当您编写“在线程之间复制小型结构”时,您应该意识到完成锁定可能需要比实际复制长数百倍的时间。
________________编辑________________
当您计算平均锁定时间时,它可能会说得很少,因为它是在您的机器上测量的,这可能不是预期的目标(可能具有完全不同的总线使用特性)。对于您的机器,平均值将由各个非常快的时间(当总线主控活动没有干扰时)一直到非常慢的时间(当总线主控干扰很大时)组成。
您可以引入确定最快和最慢情况的代码并计算商以查看自旋锁时间的变化有多大。
________________编辑________________
2016 年 5 月更新。
Peter Cordes 提倡“在非竞争情况下调整锁是有意义的”,并且现代 CPU 上不会出现数百个时钟周期的锁时间,除非在锁变量未对齐的情况下。我开始怀疑我之前的测试程序(用 32 位 Watcom C 编写)是否会受到 WOW64 的阻碍,因为它运行在 64 位操作系统:Windows 7 上。
于是我写了一个64位的程序,用TDM的gcc 5.3编译。该程序使用隐式总线锁定指令变体“XCHG r,m”进行锁定,使用简单的赋值“MOV m,r”进行解锁。在某些锁变体中,锁变量经过预先测试以确定尝试锁是否可行(使用简单的比较“CMP r,m”,可能不会冒险到 L3 之外)。这里是:
// compiler flags used:
// -O1 -m64 -mthreads -mtune=k8 -march=k8 -fwhole-program -freorder-blocks -fschedule-insns -falign-functions=32 -g3 -Wall -c -fmessage-length=0
#define CLASSIC_BUS_LOCK
#define WHILE_PRETEST
//#define SINGLE_THREAD
typedef unsigned char u1;
typedef unsigned short u2;
typedef unsigned long u4;
typedef unsigned int ud;
typedef unsigned long long u8;
typedef signed char i1;
typedef short i2;
typedef long i4;
typedef int id;
typedef long long i8;
typedef float f4;
typedef double f8;
#define usizeof(a) ((ud)sizeof(a))
#define LOOPS 25000000
#include <stdio.h>
#include <windows.h>
#ifndef bool
typedef signed char bool;
#endif
u8 CPU_rdtsc (void)
{
ud tickl, tickh;
__asm__ __volatile__("rdtsc":"=a"(tickl),"=d"(tickh));
return ((u8)tickh << 32)|tickl;
}
volatile u8 bus_lock (volatile u8 * block, u8 value)
{
__asm__ __volatile__( "xchgq %1,%0" : "=r" (value) : "m" (*block), "0" (value) : "memory");
return value;
}
void bus_unlock (volatile u8 * block, u8 value)
{
__asm__ __volatile__( "movq %0,%1" : "=r" (value) : "m" (*block), "0" (value) : "memory");
}
void rfence (void)
{
__asm__ __volatile__( "lfence" : : : "memory");
}
void rwfence (void)
{
__asm__ __volatile__( "mfence" : : : "memory");
}
void wfence (void)
{
__asm__ __volatile__( "sfence" : : : "memory");
}
volatile bool LOCK_spinlockPreTestIfFree (const volatile u8 *lockVariablePointer)
{
return (bool)(*lockVariablePointer == 0ull);
}
volatile bool LOCK_spinlockFailed (volatile u8 *lockVariablePointer)
{
return (bool)(bus_lock (lockVariablePointer, 1ull) != 0ull);
}
void LOCK_spinlockLeave (volatile u8 *lockVariablePointer)
{
*lockVariablePointer = 0ull;
}
static volatile u8 lockVariable = 0ull,
lockCounter = 0ull;
static volatile i8 threadHold = 1;
static u8 tstr[4][32]; /* 32*8=256 bytes for each thread's parameters should result in them residing in different cache lines */
struct LOCKING_THREAD_STRUCTURE
{
u8 numberOfFailures, numberOfPreTests;
f8 clocksPerLock, failuresPerLock, preTestsPerLock;
u8 threadId;
HANDLE threadHandle;
ud idx;
} *lts[4] = {(void *)tstr[0], (void *)tstr[1], (void *)tstr[2], (void *)tstr[3]};
DWORD WINAPI locking_thread (struct LOCKING_THREAD_STRUCTURE *ltsp)
{
ud n = LOOPS;
u8 clockCycles;
SetThreadAffinityMask (ltsp->threadHandle, 1ull<<ltsp->idx);
while (threadHold) {}
clockCycles = CPU_rdtsc ();
while (n)
{
Sleep (0);
#ifdef CLASSIC_BUS_LOCK
while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
while (1)
{
do
{
++ltsp->numberOfPreTests;
} while (!LOCK_spinlockPreTestIfFree (&lockVariable));
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
#else
while (1)
{
++ltsp->numberOfPreTests;
if (LOCK_spinlockPreTestIfFree (&lockVariable))
{
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
}
#endif
#endif
++lockCounter;
LOCK_spinlockLeave (&lockVariable);
#ifdef CLASSIC_BUS_LOCK
while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
while (1)
{
do
{
++ltsp->numberOfPreTests;
} while (!LOCK_spinlockPreTestIfFree (&lockVariable));
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
#else
while (1)
{
++ltsp->numberOfPreTests;
if (LOCK_spinlockPreTestIfFree (&lockVariable))
{
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
}
#endif
#endif
--lockCounter;
LOCK_spinlockLeave (&lockVariable);
n-=2;
}
clockCycles = CPU_rdtsc ()-clockCycles;
ltsp->clocksPerLock = (f8)clockCycles/ (f8)LOOPS;
ltsp->failuresPerLock = (f8)ltsp->numberOfFailures/(f8)LOOPS;
ltsp->preTestsPerLock = (f8)ltsp->numberOfPreTests/(f8)LOOPS;
//rwfence ();
ltsp->idx = 4u;
ExitThread (0);
return 0;
}
int main (int argc, char *argv[])
{
u8 processAffinityMask, systemAffinityMask;
memset (tstr, 0u, usizeof(tstr));
lts[0]->idx = 3;
lts[1]->idx = 2;
lts[2]->idx = 1;
lts[3]->idx = 0;
GetProcessAffinityMask (GetCurrentProcess(), &processAffinityMask, &systemAffinityMask);
SetPriorityClass (GetCurrentProcess(), HIGH_PRIORITY_CLASS);
SetThreadAffinityMask (GetCurrentThread (), 1ull);
lts[0]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[0], 0, (void *)<s[0]->threadId);
#ifndef SINGLE_THREAD
lts[1]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[1], 0, (void *)<s[1]->threadId);
lts[2]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[2], 0, (void *)<s[2]->threadId);
lts[3]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[3], 0, (void *)<s[3]->threadId);
#endif
SetThreadAffinityMask (GetCurrentThread (), processAffinityMask);
threadHold = 0;
#ifdef SINGLE_THREAD
while (lts[0]->idx<4u) {Sleep (1);}
#else
while (lts[0]->idx+lts[1]->idx+lts[2]->idx+lts[3]->idx<16u) {Sleep (1);}
#endif
printf ("T0:%1.1f,%1.1f,%1.1f\n", lts[0]->clocksPerLock, lts[0]->failuresPerLock, lts[0]->preTestsPerLock);
printf ("T1:%1.1f,%1.1f,%1.1f\n", lts[1]->clocksPerLock, lts[1]->failuresPerLock, lts[1]->preTestsPerLock);
printf ("T2:%1.1f,%1.1f,%1.1f\n", lts[2]->clocksPerLock, lts[2]->failuresPerLock, lts[2]->preTestsPerLock);
printf ("T3:%1.1f,%1.1f,%1.1f\n", lts[3]->clocksPerLock, lts[3]->failuresPerLock, lts[3]->preTestsPerLock);
printf ("T*:%1.1f,%1.1f,%1.1f\n", (lts[0]->clocksPerLock+ lts[1]->clocksPerLock+ lts[2]->clocksPerLock+ lts[3]->clocksPerLock)/ 4.,
(lts[0]->failuresPerLock+lts[1]->failuresPerLock+lts[2]->failuresPerLock+lts[3]->failuresPerLock)/4.,
(lts[0]->preTestsPerLock+lts[1]->preTestsPerLock+lts[2]->preTestsPerLock+lts[3]->preTestsPerLock)/4.);
printf ("LC:%u\n", (ud)lockCounter);
return 0;
}
该程序在基于 DELL i5-4310U 的计算机上运行,该计算机具有 DDR3-800、2 个内核/2 个 2.7GHz 的 HT 和一个通用的 L3 缓存。
首先,WOW64 的影响似乎可以忽略不计。
执行非竞争锁定/解锁的单个线程能够每 110 个周期执行一次。调整无争用锁是没有用的:为增强单个 XCHG 指令而添加的任何代码只会使其变慢。
随着四个 HT 用锁定尝试轰炸锁定变量,情况发生了根本性的变化。实现成功锁定所需的时间跃升至 994 个周期,其中很大一部分可归因于 2.2 次失败的锁定尝试。换句话说,在高争用情况下,平均必须尝试 3.2 次锁才能获得成功的锁。显然110个周期并没有变成110*3.2而是更接近110*9。因此,其他机制在这里发挥作用,就像在旧机器上的测试一样。此外,平均 994 个周期包含 716 和 1157 之间的范围
The lock variants implementing pre-testing required about 95% of the cycles reuired by the simplest variant (XCHG). On average they would perform 17 CMPs to find it feasible to attempt 1.75 locks of which 1 was successful. I recommend using pre-testing not only because it is faster: it imposes less strain on the bus-locking mechanism (3.2-1.75=1.45 fewer lock attempts) even though it increases the complexity slightly.