5

我正在使用一个简单的程序进行一些测试,该程序使用 atomic_add_64 与互斥锁方法测量 64 位值上的简单原子增量的性能。令我困惑的是 atomic_add 比互斥锁慢 2 倍。

编辑!!!我又做了一些测试。看起来原子比互斥锁更快,并且可以扩展到 8 个并发线程。之后,原子的性能显着下降。

我测试过的平台是:

SunOS 5.10 Generic_141444-09 sun4u sparc SUNW,Sun-Fire-V490

CC:Sun C++ 5.9 SunOS_sparc 补丁 124863-03 2008/03/12

该程序非常简单:

#include <stdio.h>
#include <stdint.h>
#include <pthread.h>
#include <atomic.h>

uint64_t        g_Loops = 1000000;
volatile uint64_t       g_Counter = 0;
volatile uint32_t       g_Threads = 20;

pthread_mutex_t g_Mutex;
pthread_mutex_t g_CondMutex;
pthread_cond_t  g_Condition;

void LockMutex() 
{ 
  pthread_mutex_lock(&g_Mutex); 
}

void UnlockMutex() 
{ 
   pthread_mutex_unlock(&g_Mutex); 
}

void InitCond()
{
   pthread_mutex_init(&g_CondMutex, 0);
   pthread_cond_init(&g_Condition, 0);
}

void SignalThreadEnded()
{
   pthread_mutex_lock(&g_CondMutex);
   --g_Threads;
   pthread_cond_signal(&g_Condition);
   pthread_mutex_unlock(&g_CondMutex);
}

void* ThreadFuncMutex(void* arg)
{
   uint64_t counter = g_Loops;
   while(counter--)
   {
      LockMutex();
      ++g_Counter;
      UnlockMutex();
   }
   SignalThreadEnded();
   return 0;
}

void* ThreadFuncAtomic(void* arg)
{
   uint64_t counter = g_Loops;
   while(counter--)
   {
      atomic_add_64(&g_Counter, 1);
   }
   SignalThreadEnded();
   return 0;
}


int main(int argc, char** argv)
{
   pthread_mutex_init(&g_Mutex, 0);
   InitCond();
   bool bMutexRun = true;
   if(argc > 1)
   {
      bMutexRun = false;
      printf("Atomic run!\n");
   }
   else
        printf("Mutex run!\n");

   // start threads
   uint32_t threads = g_Threads;
   while(threads--)
   {
      pthread_t thr;
      if(bMutexRun)
         pthread_create(&thr, 0,ThreadFuncMutex, 0);
      else
         pthread_create(&thr, 0,ThreadFuncAtomic, 0);
   }
   pthread_mutex_lock(&g_CondMutex);
   while(g_Threads)
   {
      pthread_cond_wait(&g_Condition, &g_CondMutex);
      printf("Threads to go %d\n", g_Threads);
   }
   printf("DONE! g_Counter=%ld\n", (long)g_Counter);
}

在我们的盒子上运行的测试结果是:

$ CC -o atomictest atomictest.C
$ time ./atomictest
Mutex run!
Threads to go 19
...
Threads to go 0
DONE! g_Counter=20000000

real    0m15.684s
user    0m52.748s
sys     0m0.396s

$ time ./atomictest 1
Atomic run!
Threads to go 19
...
Threads to go 0
DONE! g_Counter=20000000

real    0m24.442s
user    3m14.496s
sys     0m0.068s

您在 Solaris 上遇到过这种类型的性能差异吗?任何想法为什么会发生这种情况?

在 Linux 上,相同的代码(使用 gcc __sync_fetch_and_add)与互斥锁版本相比,性能提高了 5 倍。

谢谢,奥克塔夫

4

1 回答 1

4

你必须小心这里发生的事情。

  1. 创建线程需要大量时间。因此,可能并非所有线程都同时执行。作为证据,我拿了你的代码并删除了互斥锁,每次运行它时都得到了正确的答案。这意味着没有一个线程同时执行!您不应该计算在测试中创建/销毁线程的时间。在开始测试之前,您应该等到所有线程都创建并运行。

  2. 你的测试不公平。您的测试人为地具有非常高的锁争用。无论出于何种原因,原子 add_and_fetch 在这种情况下都会受到影响。在现实生活中,你会在线程中做一些工作。一旦你添加了一点点工作,原子操作的性能就会好很多。这是因为出现竞态条件的机会显着下降。没有争用时,原子操作的开销较低。当没有争用时,互斥锁比原子操作有更多的开销。

  3. 线程数。运行的线程越少,争用就越低。这就是为什么在这个测试中更少的线程对原子性能更好。您的 8 线程数可能是您的系统支持的同时线程数。这可能不是因为您的测试如此偏向于争用。在我看来,您的测试将扩展到允许的同时线程数,然后达到稳定状态。我无法弄清楚的一件事是,为什么当线程数高于系统可以处理的同时线程数时,我们看不到线程休眠时互斥锁被锁定的情况的证据。也许我们会,我只是看不到它发生。

最重要的是,在大多数现实生活中,原子的速度要快得多。当您必须长时间持有锁时,它们并不是很好……无论如何您都应该避免这种情况(至少在我看来!)

我更改了您的代码,这样您就可以在没有工作、几乎没有任何工作的情况下进行测试,以及更多的工作以及更改线程数。

6sm = 6 个线程,几乎没有任何工作,互斥锁 6s = 6 个线程,几乎没有任何工作,原子

使用 capitol S 获得更多工作,没有 s 获得不工作。

这些结果表明,对于 10 个线程,工作量会影响原子的速度。在第一种情况下,没有工作,原子速度几乎没有。加一点工作,差距翻倍到 6 秒,加倍工作,几乎达到 10 秒。

(2) /dev_tools/Users/c698174/temp/atomic 
[c698174@shldvgfas007] $ t=10; a.out $t ; a.out "$t"m
ATOMIC FAST g_Counter=10000000 13.6520 s
MUTEX  FAST g_Counter=10000000 15.2760 s

(2) /dev_tools/Users/c698174/temp/atomic 
[c698174@shldvgfas007] $ t=10s; a.out $t ; a.out "$t"m
ATOMIC slow g_Counter=10000000 11.4957 s
MUTEX  slow g_Counter=10000000 17.9419 s

(2) /dev_tools/Users/c698174/temp/atomic 
[c698174@shldvgfas007] $ t=10S; a.out $t ; a.out "$t"m
ATOMIC SLOW g_Counter=10000000 14.7108 s
MUTEX  SLOW g_Counter=10000000 23.8762 s

20 个线程,原子仍然更好,但幅度较小。没有工作,它们的速度几乎相同。经过大量工作,原子再次领先。

(2) /dev_tools/Users/c698174/temp/atomic 
[c698174@shldvgfas007] $ t=20; a.out $t ; a.out "$t"m
ATOMIC FAST g_Counter=20000000 27.6267 s
MUTEX  FAST g_Counter=20000000 30.5569 s

(2) /dev_tools/Users/c698174/temp/atomic 
[c698174@shldvgfas007] $ t=20S; a.out $t ; a.out "$t"m
ATOMIC SLOW g_Counter=20000000 35.3514 s
MUTEX  SLOW g_Counter=20000000 48.7594 s

2个线程。原子占主导地位。

(2) /dev_tools/Users/c698174/temp/atomic 
[c698174@shldvgfas007] $ t=2S; a.out $t ; a.out "$t"m
ATOMIC SLOW g_Counter=2000000 0.6007 s
MUTEX  SLOW g_Counter=2000000 1.4966 s

这是代码(redhat linux,使用 gcc atomics):

#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>

volatile uint64_t __attribute__((aligned (64))) g_Loops = 1000000 ;
volatile uint64_t __attribute__((aligned (64))) g_Counter = 0;
volatile uint32_t __attribute__((aligned (64))) g_Threads = 7; 
volatile uint32_t __attribute__((aligned (64))) g_Active = 0;
volatile uint32_t __attribute__((aligned (64))) g_fGo = 0;
int g_fSlow = 0;

#define true 1
#define false 0
#define NANOSEC(t) (1000000000ULL * (t).tv_sec + (t).tv_nsec)

pthread_mutex_t g_Mutex;
pthread_mutex_t g_CondMutex;
pthread_cond_t  g_Condition;

void LockMutex() 
{ 
  pthread_mutex_lock(&g_Mutex); 
}

void UnlockMutex() 
{ 
   pthread_mutex_unlock(&g_Mutex); 
}

void Start(struct timespec *pT)
{
   int cActive = __sync_add_and_fetch(&g_Active, 1);
   while(!g_fGo) {} 
   clock_gettime(CLOCK_THREAD_CPUTIME_ID, pT);
}

uint64_t End(struct timespec *pT)
{
   struct timespec T;
   int cActive = __sync_sub_and_fetch(&g_Active, 1);
   clock_gettime(CLOCK_THREAD_CPUTIME_ID, &T);
   return NANOSEC(T) - NANOSEC(*pT);
}
void Work(double *x, double z)
{
      *x += z;
      *x /= 27.6;
      if ((uint64_t)(*x + .5) - (uint64_t)*x != 0)
        *x += .7;
}
void* ThreadFuncMutex(void* arg)
{
   struct timespec T;
   uint64_t counter = g_Loops;
   double x = 0, z = 0;
   int fSlow = g_fSlow;

   Start(&T);
   if (!fSlow) {
     while(counter--) {
        LockMutex();
        ++g_Counter;
        UnlockMutex();
     }
   } else {
     while(counter--) {
        if (fSlow==2) Work(&x, z);
        LockMutex();
        ++g_Counter;
        z = g_Counter;
        UnlockMutex();
     }
   }
   *(uint64_t*)arg = End(&T);
   return (void*)(int)x;
}

void* ThreadFuncAtomic(void* arg)
{
   struct timespec T;
   uint64_t counter = g_Loops;
   double x = 0, z = 0;
   int fSlow = g_fSlow;

   Start(&T);
   if (!fSlow) {
     while(counter--) {
        __sync_add_and_fetch(&g_Counter, 1);
     }
   } else {
     while(counter--) {
        if (fSlow==2) Work(&x, z);
        z = __sync_add_and_fetch(&g_Counter, 1);
     }
   }
   *(uint64_t*)arg = End(&T);
   return (void*)(int)x;
}


int main(int argc, char** argv)
{
   int i;
   int bMutexRun = strchr(argv[1], 'm') != NULL;
   pthread_t thr[1000];
   uint64_t aT[1000];
   g_Threads = atoi(argv[1]);
   g_fSlow = (strchr(argv[1], 's') != NULL) ? 1 : ((strchr(argv[1], 'S') != NULL) ? 2 : 0);

   // start threads
   pthread_mutex_init(&g_Mutex, 0);
   for (i=0 ; i<g_Threads ; ++i)
         pthread_create(&thr[i], 0, (bMutexRun) ? ThreadFuncMutex : ThreadFuncAtomic, &aT[i]);

   // wait
   while (g_Active != g_Threads) {}
   g_fGo = 1;
   while (g_Active != 0) {}

   uint64_t nTot = 0;
   for (i=0 ; i<g_Threads ; ++i)
   { 
        pthread_join(thr[i], NULL);
        nTot += aT[i];
   }
   // done 
   printf("%s %s g_Counter=%llu %2.4lf s\n", (bMutexRun) ? "MUTEX " : "ATOMIC", 
    (g_fSlow == 2) ? "SLOW" : ((g_fSlow == 1) ? "slow" : "FAST"), g_Counter, (double)nTot/1e9);
}
于 2012-04-28T04:37:13.283 回答