我将通过一个pthread_barrier_wait()
使用互斥锁和条件变量功能的示例实现来解决这个问题,这可能由 pthreads 实现提供。注意,这个例子并没有尝试处理性能方面的考虑(具体来说,当等待线程被解除阻塞时,它们在退出等待时都被重新序列化)。我认为使用诸如 Linux Futex 对象之类的东西可以帮助解决性能问题,但 Futex 仍然远远超出我的经验。
此外,我怀疑此示例是否正确处理信号或错误(如果在信号的情况下处理的话)。但我认为可以添加对这些东西的适当支持作为读者的练习。
我主要担心该示例可能存在竞争条件或死锁(互斥锁处理比我喜欢的要复杂)。另请注意,这是一个甚至尚未编译的示例。将其视为伪代码。另外请记住,我的经验主要是在 Windows 中 - 我更多地将其视为一种教育机会,而不是其他任何事情。所以伪代码的质量可能很低。
但是,抛开免责声明不谈,我认为它可以让您了解如何处理问题中提出的问题(即,该pthread_barrier_wait()
函数如何允许pthread_barrier_t
它使用的对象被任何已释放线程销毁而不存在使用一个或多个线程在退出时的屏障对象)。
开始:
/*
* Since this is a part of the implementation of the pthread API, it uses
* reserved names that start with "__" for internal structures and functions
*
* Functions such as __mutex_lock() and __cond_wait() perform the same function
* as the corresponding pthread API.
*/
// struct __barrier_wait data is intended to hold all the data
// that `pthread_barrier_wait()` will need after releasing
// waiting threads. This will allow the function to avoid
// touching the passed in pthread_barrier_t object after
// the wait is satisfied (since any of the released threads
// can destroy it)
struct __barrier_waitdata {
struct __mutex cond_mutex;
struct __cond cond;
unsigned waiter_count;
int wait_complete;
};
struct __barrier {
unsigned count;
struct __mutex waitdata_mutex;
struct __barrier_waitdata* pwaitdata;
};
typedef struct __barrier pthread_barrier_t;
int __barrier_waitdata_init( struct __barrier_waitdata* pwaitdata)
{
waitdata.waiter_count = 0;
waitdata.wait_complete = 0;
rc = __mutex_init( &waitdata.cond_mutex, NULL);
if (!rc) {
return rc;
}
rc = __cond_init( &waitdata.cond, NULL);
if (!rc) {
__mutex_destroy( &pwaitdata->waitdata_mutex);
return rc;
}
return 0;
}
int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count)
{
int rc;
rc = __mutex_init( &barrier->waitdata_mutex, NULL);
if (!rc) return rc;
barrier->pwaitdata = NULL;
barrier->count = count;
//TODO: deal with attr
}
int pthread_barrier_wait(pthread_barrier_t *barrier)
{
int rc;
struct __barrier_waitdata* pwaitdata;
unsigned target_count;
// potential waitdata block (only one thread's will actually be used)
struct __barrier_waitdata waitdata;
// nothing to do if we only need to wait for one thread...
if (barrier->count == 1) return PTHREAD_BARRIER_SERIAL_THREAD;
rc = __mutex_lock( &barrier->waitdata_mutex);
if (!rc) return rc;
if (!barrier->pwaitdata) {
// no other thread has claimed the waitdata block yet -
// we'll use this thread's
rc = __barrier_waitdata_init( &waitdata);
if (!rc) {
__mutex_unlock( &barrier->waitdata_mutex);
return rc;
}
barrier->pwaitdata = &waitdata;
}
pwaitdata = barrier->pwaitdata;
target_count = barrier->count;
// all data necessary for handling the return from a wait is pointed to
// by `pwaitdata`, and `pwaitdata` points to a block of data on the stack of
// one of the waiting threads. We have to make sure that the thread that owns
// that block waits until all others have finished with the information
// pointed to by `pwaitdata` before it returns. However, after the 'big' wait
// is completed, the `pthread_barrier_t` object that's passed into this
// function isn't used. The last operation done to `*barrier` is to set
// `barrier->pwaitdata = NULL` to satisfy the requirement that this function
// leaves `*barrier` in a state as if `pthread_barrier_init()` had been called - and
// that operation is done by the thread that signals the wait condition
// completion before the completion is signaled.
// note: we're still holding `barrier->waitdata_mutex`;
rc = __mutex_lock( &pwaitdata->cond_mutex);
pwaitdata->waiter_count += 1;
if (pwaitdata->waiter_count < target_count) {
// need to wait for other threads
__mutex_unlock( &barrier->waitdata_mutex);
do {
// TODO: handle the return code from `__cond_wait()` to break out of this
// if a signal makes that necessary
__cond_wait( &pwaitdata->cond, &pwaitdata->cond_mutex);
} while (!pwaitdata->wait_complete);
}
else {
// this thread satisfies the wait - unblock all the other waiters
pwaitdata->wait_complete = 1;
// 'release' our use of the passed in pthread_barrier_t object
barrier->pwaitdata = NULL;
// unlock the barrier's waitdata_mutex - the barrier is
// ready for use by another set of threads
__mutex_unlock( barrier->waitdata_mutex);
// finally, unblock the waiting threads
__cond_broadcast( &pwaitdata->cond);
}
// at this point, barrier->waitdata_mutex is unlocked, the
// barrier->pwaitdata pointer has been cleared, and no further
// use of `*barrier` is permitted...
// however, each thread still has a valid `pwaitdata` pointer - the
// thread that owns that block needs to wait until all others have
// dropped the pwaitdata->waiter_count
// also, at this point the `pwaitdata->cond_mutex` is locked, so
// we're in a critical section
rc = 0;
pwaitdata->waiter_count--;
if (pwaitdata == &waitdata) {
// this thread owns the waitdata block - it needs to hang around until
// all other threads are done
// as a convenience, this thread will be the one that returns
// PTHREAD_BARRIER_SERIAL_THREAD
rc = PTHREAD_BARRIER_SERIAL_THREAD;
while (pwaitdata->waiter_count!= 0) {
__cond_wait( &pwaitdata->cond, &pwaitdata->cond_mutex);
};
__mutex_unlock( &pwaitdata->cond_mutex);
__cond_destroy( &pwaitdata->cond);
__mutex_destroy( &pwaitdata_cond_mutex);
}
else if (pwaitdata->waiter_count == 0) {
__cond_signal( &pwaitdata->cond);
__mutex_unlock( &pwaitdata->cond_mutex);
}
return rc;
}
20111 年 7 月 17 日:更新以响应有关流程共享障碍的评论/问题
我完全忘记了进程之间共享障碍的情况。正如你所提到的,在这种情况下,我概述的想法将非常失败。我真的没有使用 POSIX 共享内存的经验,所以我提出的任何建议都应该持怀疑态度。
总结一下(为了我的利益,如果没有其他人的话):
当任何线程在pthread_barrier_wait()
返回后获得控制时,屏障对象需要处于“初始化”状态(但是,该pthread_barrier_init()
对象上的最新设置它)。API 还暗示,一旦任何线程返回,可能会发生以下一种或多种情况:
- 另一个调用以
pthread_barrier_wait()
开始新一轮线程同步
pthread_barrier_destroy()
在障碍物上
- 如果屏障对象位于共享内存区域中,则分配给屏障对象的内存可能会被释放或取消共享。
这些事情意味着在pthread_barrier_wait()
调用允许任何线程返回之前,它几乎需要确保所有等待的线程不再在该调用的上下文中使用屏障对象。我的第一个答案是通过在会阻塞所有线程的屏障对象之外创建一组“本地”同步对象(互斥锁和关联的条件变量)来解决这个问题。这些本地同步对象被分配在恰好首先调用的线程的堆栈上pthread_barrier_wait()
。
我认为对于流程共享的障碍也需要做类似的事情。但是,在这种情况下,仅在线程堆栈上分配这些同步对象是不够的(因为其他进程将无权访问)。对于进程共享屏障,必须在进程共享内存中分配这些对象。我认为我上面列出的技术可以类似地应用:
- 控制本地同步变量(waitdata 块)的“分配”的
waitdata_mutex
那个将已经在进程共享内存中,因为它位于屏障结构中。当然,当屏障设置为 时THEAD_PROCESS_SHARED
,该属性也需要应用于waitdata_mutex
- 当
__barrier_waitdata_init()
被调用来初始化本地互斥和条件变量时,它必须在共享内存中分配这些对象,而不是简单地使用基于堆栈的waitdata
变量。
- 当“清理”线程销毁块中的互斥锁和条件变量时
waitdata
,它还需要清理块的进程共享内存分配。
- 在使用共享内存的情况下,需要有某种机制来确保共享内存对象在每个进程中至少打开一次,并在每个进程中关闭正确的次数(但不是在每个线程之前完全关闭)该过程已完成使用它)。我还没想好具体是怎么做的...
我认为这些变化将允许该计划在流程共享障碍的情况下运作。上面的最后一个要点是要弄清楚的关键项目。另一个是如何为将保存“本地”进程共享的共享内存对象构造一个名称waitdata
。您需要该名称的某些属性:
- 您希望名称的存储驻留在
struct pthread_barrier_t
结构中,以便所有进程都可以访问它;这意味着对名称长度的已知限制
- 您希望名称对于一组调用的每个“实例”都是唯一的,
pthread_barrier_wait()
因为在所有线程完全退出第一轮等待之前,可能会开始第二轮等待(所以为 设置的进程共享内存块waitdata
可能尚未释放)。所以名称可能必须基于诸如进程 ID、线程 ID、屏障对象的地址和原子计数器之类的东西。
- 我不知道将名称设为“可猜测”是否存在安全隐患。如果是这样,则需要添加一些随机化-不知道多少。也许您还需要对上面提到的数据以及随机位进行哈希处理。就像我说的,我真的不知道这是否重要。