3

我有一个基于 pthread 的多线程程序,它有四个线程无限期地执行这个运行循环(伪代码):

while(keepRunning)
{
   pthread_barrier_wait(&g_stage_one_barrier);

   UpdateThisThreadsStateVariables();  

   pthread_barrier_wait(&g_stage_two_barrier);

   DoComputationsThatReadFromAllThreadsStateVariables();
}

这很有效,因为在第一阶段每个线程都会更新自己的状态变量,这没关系,因为在第一阶段没有其他线程正在读取任何其他线程的状态变量。然后在第二阶段,就读取彼此状态的线程而言,这是一个免费的,但这没关系,因为在第二阶段没有线程修改其本地状态变量,因此它们实际上是只读的。

我剩下的唯一问题是,当我的应用程序需要退出时,如何干净可靠地关闭这些线程?(通过“干净可靠”,我的意思是不引入潜在的死锁或竞争条件,理想情况下不必发送任何 UNIX 信号来强制线程退出 pthread_barrier_wait() 调用)

我的 main() 线程当然可以将每个线程的 keepRunning 设置为 false,但是它如何让 pthread_barrier_wait() 为每个线程返回?AFAICT 让 pthread_barrier_wait() 返回的唯一方法是将所有四个线程的执行位置同时放在 pthread_barrier_wait() 内,但是当某些线程可能已经退出时,这很难做到。

调用 pthread_barrier_destroy() 似乎是我想要做的,但是当任何线程都可能在屏障上等待时,这样做是未定义的行为。

这个问题有一个众所周知的解决方案吗?

4

4 回答 4

2

您可以有一个额外的线程在相同的障碍上同步,但仅作为“关闭主机”存在。您的工作线程将使用您在问题中的确切代码,并且“关闭主”线程将执行以下操作:

while (keepRunning)
{
    pthread_barrier_wait(&g_stage_one_barrier);

    pthread_mutex_lock(&mkr_lock);
    if (!mainKeepRunning)
        keepRunning = 0;
    pthread_mutex_unlock(&mkr_lock);

    pthread_barrier_wait(&g_stage_two_barrier);
}

当主线程希望其他线程关闭时,它会这样做:

pthread_mutex_lock(&mkr_lock);
mainKeepRunning = 0;
pthread_mutex_unlock(&mkr_lock);

(即,keepRunning变量成为共享线程状态的一部分,在第 2 阶段是只读的,在第 1 阶段归关闭主线程所有)。

当然,您也可以只选择其他线程之一作为“关闭主线程”,而不是为此目的使用专用线程。

于 2015-03-04T04:23:14.453 回答
2

有两个标志并使用类似以下的东西应该可以工作:

for (;;)
{
    pthread_barrier_wait(&g_stage_one_barrier);           +
                                                          |
    UpdateThisThreadsStateVariables();                    |
                                                          |
    pthread_mutex_lock(&shutdownMtx);                     | Zone 1
    pendingShutdown = !keepRunning;                       |
    pthread_mutex_unlock(&shutdownMtx);                   |
                                                          |
    pthread_barrier_wait(&g_stage_two_barrier);           +
                                                          |
    if (pendingShutdown)                                  |
        break;                                            | Zone 2
                                                          |
    DoComputationsThatReadFromAllThreadsStateVariables(); |
}

shutdownMtx应该保护keepRunningtoo 的设置,尽管它没有显示出来。

逻辑是,当时间pendingShutdown设置为 时true,所有线程都必须在区域 1内。(即使只有一些线程看到keepRunning存在也是false如此,所以比赛keepRunning应该没问题。)因此它们都会到达,然后当它们进入区域 2pthread_barrier_wait(&g_stage_two_barrier)时全部爆发。

也可以检查PTHREAD_BARRIER_SERIAL_THREAD-pthread_barrier_wait()恰好由其中一个线程返回 - 并且只pendingShutdown在该线程中进行锁定和更新,这可以提高性能。

于 2015-03-04T00:26:55.210 回答
1

存在需求冲突:屏障语义要求所有线程in继续运行,而当线程在执行块之间共享(可能位于不同的屏障内)时,关闭需要终止。

我建议用支持外部cancel调用的自定义实现替换屏障。

示例(可能无法运行,但想法...):

struct _barrier_entry
{
  pthread_cond_t cond;
  volatile bool released;
  volatile struct _barrier_entry *next;
};

typedef struct
{
  volatile int capacity;
  volatile int count;
  volatile struct _barrier_entry *first;
  pthread_mutex_t lock;
} custom_barrier_t;

初始化:

int custom_barrier_init(custom_barrier_t *barrier, int capacity)
{
   if (NULL == barrier || capacity <= 0)
   {
     errno = EINVAL;
     return -1;
   }
   barrier->capacity = capacity;
   barrier->count = 0;
   barrier->first = NULL;
   return pthread_mutex_init(&barrier->lock, NULL);
   return -1;
}

帮手:

static void _custom_barrier_flush(custom_barrier_t *barrier)
{
   struct _barrier_entry *ptr;
   for (ptr = barrier->first; NULL != ptr;)
   {
     struct _barrier_entry *next = ptr->next;
     ptr->released = true;
     pthread_cond_signal(&ptr->cond);
     ptr = next;
   }
   barrier->first = NULL;
   barrier->count = 0;
}

阻塞等待:

int custom_barrier_wait(custom_barrier_t *barrier)
{
   struct _barrier_entry entry;
   int result;
   pthread_cond_init(&barrier->entry, NULL);
   entry->next = NULL;
   entry->released = false;

   pthread_mutex_lock(&barrier->lock);
   barrier->count++;
   if (barrier->count == barrier->capacity)
   {
     _custom_barrier_flush(barrier);
     result = 0;
   }
   else
   {
     entry->next = barrier->first;
     barrier->first = entry;
     while (true)
     {
       pthread_cond_wait(&entry->cond, &barrier->lock);
       if (entry->released)
       {
         result = 0;
         break;
       }
       if (barrier->capacity < 0)
       {
         errno = ECANCELLED;
         result = -1;
         break;
       }
     }
   }
   pthread_mutex_unlock(&barrier->lock);
   pthread_cond_destroy(&entry->cond);
   return result;
}

消除:

 int custom_barrier_cancel(custom_barrier_t *barrier)
 {
   pthread_mutex_lock(barrier->lock);
   barrier->capacity = -1;
   _custom_barrier_flush(barrier);
   pthread_mutex_unlock(barrier->lock);
   return 0;
 }

所以线程代码可以在循环中运行,直到调用ECANCELLED后出错。custom_barrier_wait

于 2015-03-04T00:27:30.053 回答
0

在屏障处等待的线程不是问题,而是仍在运行UpdateThis...DoComputations...延迟关闭的线程。UpdateThis...您可以通过定期检查和DoComputations...函数内部的关闭来减少关闭时间。

这是一种可能的解决方案的概要

  • main 初始化一个互斥体g_shutdown_mutex
  • main 锁定互斥体
  • main 启动线程
  • 线程在定期尝试锁定互斥锁时做他们的事情,但由于 main 已锁定互斥锁,该trylock函数将始终失败
  • 当需要关闭时,main 会解锁互斥锁
  • 现在trylock将成功并且工作函数将提前返回
  • 在到达第二个屏障之前,任何成功锁定互斥锁的线程都会设置一个全局变量g_shutdown_requested
  • 通过第二道屏障后,所有线程将看到相同的值g_shutdown_requested并做出相同的决定是否退出

所以while循环看起来像这样

while(1)
{
    pthread_barrier_wait(&g_stage_one_barrier);

    UpdateThisThreadsStateVariables();

    if ( pthread_mutex_trylock( &g_shutdown_mutex ) == 0 )
    {
        g_shutdown_requested = true;
        pthread_mutex_unlock( &g_shutdown_mutex );
        break;
    }

    pthread_barrier_wait(&g_stage_two_barrier);

    if ( g_shutdown_requested )
        break;

    DoComputationsThatReadFromAllThreadsStateVariables();
}

工人函数看起来像这样

void UpdateThisThreadsStateVariables( void )
{
    for ( i = 0;; i++ )
    {
        // check the mutex once every 4000 times through the loop
        if ( (i & 0xfff) == 0 && pthread_mutex_trylock( &g_shutdown_mutex ) == 0 )
        {
            pthread_mutex_unlock( &g_shutdown_mutex );   // abnormal termination
            return; 
        }

        // do the important stuff here

        if ( doneWithTheImportantStuff )    // normal termination
            break;
    }
}
于 2015-03-03T23:59:03.587 回答