1

例如,我有一些由多个线程同时计算的工作。

出于演示目的,该工作在 while 循环内执行。在单次迭代中,每个线程执行自己的部分工作,在下一次迭代开始之前,计数器应该增加一次。

我的问题是计数器由每个线程更新。

由于这似乎是一件相对简单的事情,我认为有一个“最佳实践”或通用方法来解决它?

这是一些示例代码来说明问题并帮助讨论。(我正在使用提升线程)

class someTask {
public:
    int mCounter; //initialized to 0
    int mTotal; //initialized to i.e. 100000
    boost::mutex cntmutex;                
    int getCount()
    {
            boost::mutex::scoped_lock lock( cntmutex );
            return mCount;
    }
    void process( int thread_id, int numThreads )
    {
        while ( getCount() < mTotal )
        {
            // The main task is performed here and is divided 
            // into sub-tasks based on the thread_id and numThreads

                            // Wait for all thread to get to this point

            cntmutex.lock();
            mCounter++;  // < ---- how to ensure this is only updated once?
            cntmutex.unlock();
        }
    }
};
4

4 回答 4

5

我在这里看到的主要问题是你的推理水平太低了。因此,我将提出一个基于新的 C++11 线程 API 的替代解决方案。

主要思想是你基本上有一个调度 -> 调度 -> 执行 -> 收集 -> 循环例程。在您的示例中,您尝试在do非常困难的阶段内对所有这些进行推理。使用相反的方法可以更容易地表达您的模式。

首先,我们将要完成的工作隔离在自己的例程中:

void process_thread(size_t id, size_t numThreads) {
    // do something
}

现在,我们可以轻松调用此例程:

#include <future>
#include <thread>
#include <vector>

void process(size_t const total, size_t const numThreads) {
    for (size_t count = 0; count != total; ++count) {
         std::vector< std::future<void> > results;

         // Create all threads, launch the work!
         for (size_t id = 0; id != numThreads; ++id) {
             results.push_back(std::async(process_thread, id, numThreads));
         }

         // The destruction of `std::future`
         // requires waiting for the task to complete (*)
    }
}

(*) 见这个问题

你可以在这里阅读更多关于std::async 这里的内容,这里提供了一个简短的介绍(它们在发布政策的效果上似乎有些矛盾,哦,好吧)。在这里让实现决定是否创建操作系统线程更简单:它可以根据可用内核的数量进行调整。

请注意如何通过删除共享状态来简化代码。因为线程不共享任何内容,我们不再需要显式地担心同步!

于 2012-05-06T10:06:40.383 回答
2

您使用互斥锁保护了计数器,确保没有两个线程可以同时访问计数器。您的另一个选择是使用Boost::atomicc++11 原子操作或特定于平台的原子操作。

但是,您的代码似乎mCounter无需持有互斥锁即可访问:

    while ( mCounter < mTotal )

那是个问题。您需要持有互斥锁才能访问共享状态。

你可能更喜欢使用这个成语:

  1. 获取锁。

  2. 做测试和其他事情来决定我们是否需要做工作。

  3. 调整会计以反映我们决定做的工作。

  4. 释放锁。做工作。获取锁。

  5. 调整会计以反映我们所做的工作。

  6. 除非我们完全完成,否则循环回到第 2 步。

  7. 释放锁。

于 2012-05-05T18:49:25.950 回答
1

您需要使用消息传递解决方案。TBB 或 PPL 等库更容易实现这一点。PPL 免费包含在 Visual Studio 2010 及更高版本中,TBB 可以根据英特尔的 FOSS 许可免费下载。

concurrent_queue<unsigned int> done;
std::vector<Work> work; 
// fill work here
parallel_for(0, work.size(), [&](unsigned int i) {
    processWorkItem(work[i]);
    done.push(i);
});

它是无锁的,您可以让外部线程监视done变量以查看已完成的数量和内容。

于 2012-05-06T15:53:36.617 回答
1

我想不同意大卫关于通过多次锁定获取来完成这项工作的观点。

Mutexes代价高昂并且有更多线程竞争 a mutex,它基本上会退回到系统调用,这会导致用户空间到内核空间上下文切换以及调用者线程(/s)被迫休眠:因此有很多开销。

因此,如果您使用的是多处理器系统,我强烈建议您使用自旋锁 [1]。

所以我要做的是:

=> 摆脱作用域锁获取来检查条件。

=> 让你的计数器不稳定以支持上面

=> 在 while 循环中,在获取锁后再次进行条件检查。

class someTask {
 public:
 volatile int mCounter; //initialized to 0       : Make your counter Volatile
 int mTotal; //initialized to i.e. 100000
 boost::mutex cntmutex;                

 void process( int thread_id, int numThreads )
 {
    while ( mCounter < mTotal ) //compare without acquiring lock
    {
        // The main task is performed here and is divided 
        // into sub-tasks based on the thread_id and numThreads

        cntmutex.lock();
        //Now compare again to make sure that the condition still holds
        //This would save all those acquisitions and lock release we did just to 
        //check whther the condition was true.
        if(mCounter < mTotal)
        {
             mCounter++;  
        }

        cntmutex.unlock();
    }
 }
};

[1]http://www.alexonlinux.com/pthread-mutex-vs-pthread-spinlock

于 2012-05-08T00:06:39.567 回答