4

我试图在我的代码中实现一个简单的屏障,如下所示:

void waitOnBarrier(int* barrier, int numberOfThreads) {
    atomicIncrement(barrier); // atomic increment implemented in assembly
    while(*barrier < numberOfThreads);
}

然后代码中有一个屏障用法:

int g_barrier = 0; // a global variable

waitOnBarrier(&g_barrier, someKnownNumberOfThreads);

到目前为止一切顺利,但我应该在哪里将我的g_barrier变量重置为零?如果我写类似

g_barrier = 0;

waitOnBarrier调用之后,如果其中一个线程比其他线程更快地从屏障中释放并在所有其他线程仍在执行循环指令时使g_barrier无效,那么我将遇到问题,因此最终它们将永远卡在屏障上.

解释: waitOnBarrier会编译成这样的东西(伪代码):

1: mov rax, numberOfThreads
2: mov rbx, [barrier]
3: cmp rax, rbx
4: jmp(if smaller) to 2

因此,如果我们有 2 个线程在屏障上同步,并且thread_1在指令 3 或 4 的某处变慢,而更快的thread_2到达屏障,通过它并继续到g_barrier无效流程。这意味着在thread_1到达指令 2 之后,它将在 [barrier] 处看到零值,并将永远卡在屏障上!

问题是,我应该如何取消g_barrier,它在代码中的哪个位置“足够远”,我可以确定到那时所有线程都离开了屏障?还是有更正确的方法来实施障碍?

4

4 回答 4

3

障碍实际上很难实现,主要原因是新的服务员可以在所有老服务员有机会执行之前开始到达,这排除了任何基于计数的简单实现。我首选的解决方案是让屏障对象本身简单地指向一个“当前屏障实例”,该实例存在于到达屏障的第一个线程的堆栈上,并且这也是最后一个离开的线程(因为它不能离开而其他线程线程仍在引用其堆栈)。就 pthread 原语(可以适应 C11 锁定原语或您必须使用的任何东西)而言,一个非常好的示例实现包含在 Michael Burr 对我过去关于该主题的问题的回答中:

https://stackoverflow.com/a/5902671/379897

是的,看起来工作量很大,但是编写一个真正满足屏障契约的屏障实现并非易事。

于 2014-10-07T11:28:10.870 回答
1

我在尝试做类似的事情时遇到了这个问题,所以我想我会分享我的解决方案,以防其他人发现它有用。它是在纯 C++11 中实现的(遗憾的是不是 C11,因为标准的多线程部分在 gcc 和 msvc 中尚不支持)。

基本上,您维护两个计数器,它们的用法是交替的。下面是实现和使用示例:

    #include <cstdio>
    #include <thread>
    #include <condition_variable>

    // A barrier class; The barrier is initialized with the number of expected threads to synchronize
    class barrier_t{
        const size_t count;
        size_t counter[2], * currCounter;
        std::mutex mutex;
        std::condition_variable cv;

    public:
        barrier_t(size_t count) : count(count), currCounter(&counter[0]) {
            counter[0] = count;
            counter[1] = 0;
        }
        void wait(){
            std::unique_lock<std::mutex> lock(mutex);
            if (!--*currCounter){
                currCounter += currCounter == counter ? 1 : -1;
                *currCounter = count;
                cv.notify_all();
            }
            else {
                size_t * currCounter_local = currCounter;
                cv.wait(lock, [currCounter_local]{return *currCounter_local == 0; });
            }
        }
    };

    void testBarrier(size_t iters, size_t threadIdx, barrier_t *B){
        for(size_t i = 0; i < iters; i++){
            printf("Hello from thread %i for the %ith time!\n", threadIdx, i);
            B->wait();
        }
    }

    int main(void){
        const size_t threadCnt = 4, iters = 8;
        barrier_t B(threadCnt);
        std::thread t[threadCnt];   
        for(size_t i = 0; i < threadCnt; i++) t[i] = std::thread(testBarrier, iters, i, &B);
        for(size_t i = 0; i < threadCnt; i++) t[i].join();
        return 0;
    }
于 2015-03-18T23:11:22.107 回答
0

尝试实施本书中解释的障碍解决方案:

信号量小书

于 2014-10-07T09:10:10.237 回答
0

不要将barrier变量重置为零。

当任何线程即将退出时,自动将barrier变量减一。

您的屏障看起来不希望产生的工作线程数低于numberOfThreads

于 2014-10-07T10:11:41.437 回答