0

我想以智能的方式使用 pthreads 在 C++ 中同步线程。

我有一个全局变量:

int Resources = 0;

我有两个线程函数:

void *incResources(void *arg)
{
   while(1)
   {
     pthread_mutex_lock (&resourcesMutex);
     Resources += 2;
     pthread_mutex_unlock (&resourcesMutex);
   }

pthread_exit((void*) 0);
}

void *consumeResources(void *arg)
{
   while(1)
   {
     pthread_mutex_lock (&resourcesMutex);
     Resources--;
     pthread_mutex_unlock (&resourcesMutex);
   }
   pthread_exit((void*) 0);
 }

在 main 函数中,我初始化了两个消耗线程和一个递增线程:

pthread_mutex_init(&resourcesMutex, NULL);

pthread_create(&callThd[0], &attr, incResources, (void *)i);
pthread_create(&callThd[1], &attr, consumeResources, (void *)i);
pthread_create(&callThd[2], &attr, consumeResources, (void *)i);

我觉得这太低效了,可以做得更好。你能给我一些想法吗?我试过使用等待,但我不明白:/谢谢!

4

4 回答 4

0

如果您正在寻找好的 C++ 方法,我强烈建议您阅读 C++ Concurrency in Action: by Anthony Williams并留下 pthread 以尽可能使用期货和类似的高级事物。如果你必须手动摆弄线程,你也可以找到很好的例子。

您的问题陈述对于明智的建议来说太模糊了——良好线程的基本思想是根本没有共享状态,并且对于像您这样的握手情况很可能,使用一些为此目的而制作的同步队列。

于 2013-05-28T17:19:56.910 回答
0

一个更聪明的方法是使用std::mutexand std::thread(或 Boost 等效项),因此您无需手动解锁互斥锁。

条件变量将允许消费者阻塞(不浪费 CPU 周期),直到有可用的工作:

struct Resource
{
  int value;
  std::mutex mx;
  std::condition_variable cv;
};

void incResources(Resource& res)
{
   while(1)
   {
     {
       std::lock_guard<std::mutex> l(res.mx);
       res.value += 2;
     }
     res.cv.notify_all();
   }
}

void consumeResources(Resource& res)
{
   while(1)
   {
     std::unique_lock<std::mutex> l(res.mx);
     while (res.value == 0)
       res.cv.wait(l);
     res.value--;
   }
}

在主线程中:

Resources res;
res.value = 0;
std::thread t1(incResources, std::ref(res));
std::thread t2(consumeResources, std::ref(res));
std::thread t3(consumeResources, std::ref(res));
// ...
t1.join();
t2.join();
t3.join();
于 2013-05-28T17:21:47.823 回答
0

我认为,如果您使用 C++,则没有理由更喜欢原生使用 pthreads 而不是 C++11std::thread和 STL 同步类。

如果您不能使用 C++11 标准,您应该将 pthreads 本机接口包装为合理的 C++ 类表示(参见例如boost::threadSTTCL Posix 线程实现)。

于 2013-05-28T17:22:38.170 回答
0

看起来您正在尝试实现生产者和消费者, += 线程创建工作(要减少的数字)并且消费者将它们带走。

与其让消费者处于这样的微不足道的自旋循环中,不如看看条件变量。

std::queue<Job*> queue;
pthread_mutex mutex;
pthread_cond cond;

void AddJob(Job* job) {
    pthread_mutex_lock(&mutex);
    queue.push_back(job);
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);
}

void* QueueWorker(void* /*threadInfo*/) {
    Job* job = NULL;
    for (;;) {
        pthread_mutex_lock(&mutex);
        while ( queue.empty() ) {
            // unlock the mutex until the cond is signal()d or broadcast() to.
            // if this call succeeds, we will have the mutex locked again on the other side.
            pthread_cond_wait(&cond, &mutex);
        }
        // take the first task and then release the lock.
        job = queue.pop();
        pthread_mutex_unlock(&mutex);

        if ( job != NULL )
           job->Execute();
    }

    return NULL;
}

这可以扩展到多个消费者。

顺便说一句,虽然熟悉 pthreads 实现可能很有用,但您可能应该查看可用的线程包装器之一。C++11 引入了 std::thread 和 std::mutex,许多人都对 boost 发誓,但我个人发现 OpenSceneGraph 团队的“OpenThreads”库是最容易使用和最优雅的库之一。

编辑:这是一个完整的工作实现,尽管有一些人为的机制来结束运行。

#include <queue>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

static int jobNo = 0;
class Job {
public:
    Job() : m_i(++jobNo) { printf("Created job %d.\n", m_i); }
    int m_i;
    void Execute() { printf("Job %d executing.\n", m_i); usleep(500 * 1000); }
};

std::queue<Job*> queue;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void AddJob(Job* job) {
    pthread_mutex_lock(&mutex);
    queue.push(job);
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);
}

void* QueueWorker(void* /*threadInfo*/) {
    Job* job = NULL;
    for (;;) {
        pthread_mutex_lock(&mutex);
        while ( queue.empty() ) {
            // unlock the mutex until the cond is signal()d or broadcast() to.
            // if this call succeeds, we will have the mutex locked again on the other side.
            pthread_cond_wait(&cond, &mutex);
        }
        // take the first task and then release the lock.
        job = queue.front();
        queue.pop();
        pthread_mutex_unlock(&mutex);

        if ( job == NULL ) {
            // in this demonstration, NULL ends the run, so forward to any other threads.
            AddJob(NULL);
            break;
        }
        job->Execute();
        delete job;
    }
    return NULL;
}

int main(int argc, const char* argv[]) {
    pthread_t worker1, worker2;
    pthread_create(&worker1, NULL, &QueueWorker, NULL);
    pthread_create(&worker2, NULL, &QueueWorker, NULL);

    srand(time(NULL));

    // queue 5 jobs with delays.
    for ( size_t i = 0; i < 5; ++i ) {
        long delay = (rand() % 800) * 1000;
        printf("Producer sleeping %fs\n", (float)delay / (1000*1000));
        usleep(delay);
        Job* job = new Job();
        AddJob(job);
    }
    // 5 more without delays.
    for ( size_t i = 0; i < 5; ++i ) {
        AddJob(new Job);
    }
    // null to end the run.
    AddJob(NULL);

    printf("Done with jobs.\n");
    pthread_join(worker1, NULL);
    pthread_join(worker2, NULL);

    return 0;
}
于 2013-05-28T19:48:08.437 回答